Conversation
…nd ThreatQ
Brings all three connectors to parity with ServiceNow's investigation
routing pattern: stix_type="observed-data" dispatches to the platform's
native incident/case/event API through the standard CRUD interface.
XSOAR:
- Add "observed-data" → "incident" to stix_type_map
- get_object/list_objects/upsert_object/delete_object all branch on
stix_type to route to /incident/* vs /indicators/* endpoints
- Fix to_stix(): replace hardcoded ipv4-addr with _XSOAR_TYPE_TO_STIX
lookup table covering IP/IPv6/Domain/URL/File/Email types
- Fix from_stix(): replace hardcoded "IP"/"score=2" with _infer_xsoar_type()
and _confidence_to_score() helpers
- Add _incident_to_stix(): maps XSOAR incident → STIX observed-data with
severity, status, owner, labels, and CustomFields
- to_stix() dispatches on "CustomFields" presence or type=="incident"
GreyMatter:
- Add "observed-data" → "incidents" to stix_type_map (existing _resolve()
routing handles CRUD automatically)
- to_stix() dispatches on case_number/assigned_to fields to _incident_to_stix()
- Add _incident_to_stix(): maps case → STIX observed-data with case_number,
status, severity, assigned_to, tags, and TLP
ThreatQ:
- Add "observed-data" → "event" to stix_type_map
- _resolve_resource() returns "events" (already plural) for observed-data
- list_objects(): skips ?with=attributes for event queries
- to_stix(): dispatches on happened_at/event_type fields to _event_to_stix()
- Add _event_to_stix(): maps event → STIX observed-data with event_type and
x_tq_event_id
- Add link_event(event_id, stix_obj): POST /api/events/{id}/indicators —
parallel to XSOAR's link_incident and GreyMatter's link_investigation
- upsert_object() gains event_id kwarg for auto-linking on write
Also fix pre-existing UP006/UP035/UP045/E701 ruff violations in all three
files (typing.Dict→dict, typing.List→list, Optional[X]→X|None, single-line
if-return statements).
https://claude.ai/code/session_01BDoue9HxB83ijLzFARAugq
There was a problem hiding this comment.
Pull request overview
This PR expands investigation/case support across several platform connectors by introducing observed-data routing (to incident/event resources), adding investigation-linking helper APIs, and adding a large set of extended unit tests for workspace/global-context behavior.
Changes:
- Add extended unit tests covering additional workspace/global-context code paths (commit/delete flows, enrichment history, async enrichment fallbacks, SQLite workspace store behaviors).
- Extend XSOAR, ThreatQ, and GreyMatter connectors to map
observed-datato incident/event endpoints and document investigation-linking usage. - Add/adjust STIX translation helpers and type mappings to support the new investigation paths.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
tests/unit/context/test_workspace_extended.py |
New extended unit tests for workspace/global-context behaviors and SQLite store paths. |
gnat/connectors/xsoar/client.py |
Adds observed-data → incident CRUD support, richer STIX translation, and incident-linking helper. |
gnat/connectors/threatq/client.py |
Adds observed-data → event mapping, event-linking helper, and updated listing behavior/docs. |
gnat/connectors/greymatter/client.py |
Adds observed-data → incidents mapping and incident-to-STIX translation/docs. |
Comments suppressed due to low confidence (1)
gnat/connectors/greymatter/client.py:300
stix_type_mapand CRUD methods now advertise support forstix_type="observed-data"(incidents), butfrom_stix()still always translates into an observable payload. SinceGlobalContext.write_object()relies onfrom_stix()before callingupsert_object(stix_dict["type"], payload), writing anobserved-dataobject will POST incident endpoints with the wrong fields. Add anobserved-databranch infrom_stix()that produces the GreyMatter incident/case schema (e.g.title,description,severity,status, etc.).
def from_stix(self, stix_dict: dict[str, Any]) -> dict[str, Any]:
"""
Translate a STIX Indicator dict to a GreyMatter observable payload.
"""
pattern = stix_dict.get("pattern", "")
gm_type = self._infer_gm_type(pattern)
value = self._extract_value(pattern)
return {
"type": gm_type,
"value": value or stix_dict.get("name", ""),
"description": stix_dict.get("description", ""),
"confidence": stix_dict.get("confidence", 50),
"tlp": stix_dict.get("x_tlp", "white"),
"tags": stix_dict.get("x_gm_tags", []),
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| resource = self._resolve_resource(stix_type) | ||
| tq_id = payload.pop("id", None) | ||
| if tq_id: | ||
| return self.put(f"/api/{resource}/{tq_id}", json=payload) | ||
| return self.post(f"/api/{resource}", json=payload) | ||
| result = self.put(f"/api/{resource}/{tq_id}", json=payload) | ||
| else: | ||
| result = self.post(f"/api/{resource}", json=payload) | ||
| if event_id and stix_type != "observed-data": | ||
| self.link_event(event_id, payload) | ||
| return result |
There was a problem hiding this comment.
upsert_object() calls link_event(event_id, payload) after writing. At this point payload is a ThreatQ-native request body (e.g. {"value": ..., "type": ...}) and does not contain STIX name/pattern, so link_event() will post an empty value and infer the type from an empty pattern. Pass the original STIX dict into link_event(), or update link_event() to also accept native payloads (e.g. fall back to value/type when name/pattern are missing).
| @@ -282,6 +360,43 @@ def from_stix(self, stix_dict: Dict[str, Any]) -> Dict[str, Any]: | |||
| "status": {"name": "Active"}, | |||
| } | |||
There was a problem hiding this comment.
The docstring/STIX type map indicate observed-data is supported (mapped to ThreatQ Events), but from_stix() always builds an indicator payload (value/type/status). Since GlobalContext.write_object() uses client.from_stix(stix_dict) for all types, writes of STIX observed-data will send the wrong schema to the /api/events endpoint. Add a type == "observed-data" branch in from_stix() to generate an event payload (e.g. title, description, happened_at, event_type, etc.) compatible with ThreatQ events.
| def _indicator_to_stix(self, native: dict[str, Any]) -> dict[str, Any]: | ||
| """Map an XSOAR indicator dict to a STIX Indicator SDO.""" | ||
| xsoar_type = str(native.get("indicator_type", "")).lower() | ||
| stix_path = _XSOAR_TYPE_TO_STIX.get(xsoar_type, "unknown:value") | ||
| value = native.get("value", "") | ||
| pattern = f"[{stix_path} = '{value}']" if value else "" |
There was a problem hiding this comment.
_indicator_to_stix() uses a fallback object-path of "unknown:value" when the XSOAR indicator_type isn’t in _XSOAR_TYPE_TO_STIX. unknown isn’t a valid STIX object type, so this produces an invalid STIX pattern like [unknown:value = '...']. Consider falling back to an empty pattern (and leaving the type unclassified), or mapping unknowns to a valid generic observable type; also ensure the value is escaped so quotes in the indicator value can’t break the pattern syntax.
| def _indicator_to_stix(self, native: dict[str, Any]) -> dict[str, Any]: | |
| """Map an XSOAR indicator dict to a STIX Indicator SDO.""" | |
| xsoar_type = str(native.get("indicator_type", "")).lower() | |
| stix_path = _XSOAR_TYPE_TO_STIX.get(xsoar_type, "unknown:value") | |
| value = native.get("value", "") | |
| pattern = f"[{stix_path} = '{value}']" if value else "" | |
| @staticmethod | |
| def _escape_stix_pattern_value(value: Any) -> str: | |
| """Escape a value for safe inclusion in a STIX pattern string literal.""" | |
| return str(value).replace("\\", "\\\\").replace("'", "\\'") | |
| def _indicator_to_stix(self, native: dict[str, Any]) -> dict[str, Any]: | |
| """Map an XSOAR indicator dict to a STIX Indicator SDO.""" | |
| xsoar_type = str(native.get("indicator_type", "")).lower() | |
| stix_path = _XSOAR_TYPE_TO_STIX.get(xsoar_type) | |
| value = native.get("value", "") | |
| escaped_value = self._escape_stix_pattern_value(value) | |
| pattern = f"[{stix_path} = '{escaped_value}']" if stix_path and value else "" |
Signed-off-by: Bill <[email protected]>
Implements a five-step pipeline for building cross-platform investigation
evidence graphs seeded from IOC values, case IDs, hostnames, usernames,
hashes, ticket references, and other seed types.
New package: gnat/investigations/
- model.py: EvidenceNode, EvidenceEdge, EvidenceGraph, NodeType, SeedType,
Seed dataclasses. EvidenceGraph.summary() returns a compact dict
showing node/edge counts, cross-platform hit count, and shared indexes.
- normalizer.py: Platform-specific translation of raw API records into
EvidenceNodes. Handles XSOAR incidents/indicators/alerts/tasks/timeline,
GreyMatter incidents/observables/tasks, and ThreatQ events/indicators/
adversaries. Extracts IOC values, hostnames, usernames, campaign labels,
and ticket refs for correlation. _DISPATCH table routes by
(platform, record_type) key.
- correlator.py: Builds five correlation indexes (by_ioc, by_hostname,
by_username, by_campaign, by_ticket) and emits cross-platform same-*
edges between nodes from different platforms that share attributes.
- builder.py: InvestigationBuilder.build(seeds, title) orchestrates
all five steps. Seed-to-platform dispatch calls
search_indicators_by_value / search_observables_by_value / list_objects
depending on SeedType. Incident expansion calls platform-specific
get_incident_alerts, get_incident_tasks, get_incident_timeline (XSOAR),
get_investigation_observables, get_investigation_tasks (GreyMatter), and
get_event_indicators, get_event_adversaries (ThreatQ). All expansion
calls are wrapped in _safe_call() — any failure is logged at DEBUG and
skipped so a single unreachable platform never stops the graph build.
- workspace.py: materialize(graph, workspace_manager) writes every
EvidenceNode as a STIXBase object and every EvidenceEdge as a
Relationship SRO with x_confidence / x_reasoning / x_source_platform
extensions. Investigation metadata (seeds, summary, correlation indexes)
stored in workspace.metadata.
- __init__.py: Exports InvestigationBuilder, EvidenceGraph, EvidenceNode,
EvidenceEdge, NodeType, Seed, SeedType, normalize, correlate, materialize.
Connector evidence expansion methods (all three connectors):
XSOAR:
- get_incident_alerts(incident_id) → POST /alerts/search
- get_incident_tasks(incident_id) → GET /tasks?incidentId=...
- get_incident_timeline(incident_id)→ POST /entry/search
- search_indicators_by_value(value) → POST /indicators/search?query=value:...
GreyMatter:
- get_investigation_observables(case_id) → GET /v1/incidents/{id}/linked_observables
- get_investigation_tasks(case_id) → GET /v1/incidents/{id}/tasks
- search_observables_by_value(value) → GET /v1/observables?value=...
ThreatQ:
- get_event_indicators(event_id) → GET /api/events/{id}/indicators
- get_event_adversaries(event_id) → GET /api/events/{id}/adversaries
- search_indicators_by_value(value)→ GET /api/indicators?search=...
Smoke test confirmed: three nodes from XSOAR, GreyMatter, and ThreatQ
sharing IP 185.220.101.5 produce three cross-platform same-ioc edges.
https://claude.ai/code/session_01BDoue9HxB83ijLzFARAugq
… XDR
Adds investigation sub-API support (normalizer + builder expansion) for
three new platforms, bringing total coverage from 3 to 6 platforms.
TheHive:
- get_case_observables(case_id) — fetch IOCs via GET /api/v1/case/{id}/observable
- get_case_tasks(case_id) — fetch tasks via POST /api/v1/query listTask
- search_observables_by_value() — cross-case IOC search via query API
ServiceNow SecOps:
- get_incident_tasks(sys_id) — sn_si_task records linked to SIR parent
- get_incident_observables(sys_id) — sn_ti_observable records linked to incident
- search_indicators_by_value() — TIARA observable LIKE query
Cortex XDR:
- get_incident_alerts(incident_id) — extracts alerts from get_incident_extra_data
- get_incident_artifacts(incident_id) — extracts network/file artifacts
- search_indicators_by_value() — XDR/XSIAM indicator exact-value search
Normalizer: adds 9 new typed normaliser functions covering case/incident,
observable/alert, task, and artifact record types for all three platforms.
Builder: registers _expand_hive_case, _expand_sn_secops_incident, and
_expand_xdr_incident; dispatches from _expand_incident by platform name.
3205 unit tests pass.
https://claude.ai/code/session_01BDoue9HxB83ijLzFARAugq
No description provided.