Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions gnat/connectors/cortex_xdr/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,73 @@ def isolate_endpoint(self, endpoint_id: str) -> dict[str, Any]:
)
return resp if isinstance(resp, dict) else {}

# ── Investigation sub-API ──────────────────────────────────────────────

def get_incident_alerts(self, incident_id: str) -> list[dict[str, Any]]:
"""
Return the alerts that belong to a Cortex XDR incident.

Calls ``get_incident_extra_data`` and extracts the ``alerts`` list
from the enriched response.

Parameters
----------
incident_id : str
Cortex XDR incident ID.
"""
extra = self.get_incident_extra_data(incident_id)
alerts = extra.get("alerts", {})
if isinstance(alerts, dict):
return alerts.get("data", [])
if isinstance(alerts, list):
return alerts
return []

def get_incident_artifacts(self, incident_id: str) -> list[dict[str, Any]]:
"""
Return network / file artifacts observed in a Cortex XDR incident.

Extracts ``network_artifacts`` and ``file_artifacts`` from the
enriched incident response and merges them into a single list.

Parameters
----------
incident_id : str
Cortex XDR incident ID.
"""
extra = self.get_incident_extra_data(incident_id)
artifacts: list[dict[str, Any]] = []
for key in ("network_artifacts", "file_artifacts"):
bucket = extra.get(key, {})
if isinstance(bucket, dict):
artifacts.extend(bucket.get("data", []))
elif isinstance(bucket, list):
artifacts.extend(bucket)
return artifacts

def search_indicators_by_value(self, value: str) -> list[dict[str, Any]]:
"""
Search XDR/XSIAM threat indicators by exact value.

Parameters
----------
value : str
Indicator value (IP, domain, hash, etc.) to search for.
"""
resp = self.post(
"/public_api/v1/indicators/",
json={
"request_data": {
"filters": [
{"field": "indicator_value", "operator": "eq", "value": value}
],
"page_size": 100,
"page_number": 0,
}
},
)
return resp.get("reply", {}).get("indicators", []) if isinstance(resp, dict) else []

def get_indicators(
self,
ioc_type: str | None = None,
Expand Down
167 changes: 138 additions & 29 deletions gnat/connectors/greymatter/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,21 @@

* ``/v1/observables`` — observable values (IPs, domains, hashes, URLs)
* ``/v1/indicators`` — compound indicators with patterns
* ``/v1/incidents`` — security incidents
* ``/v1/incidents`` — security investigations / cases (``observed-data``)
* ``/v1/threat-actors`` — threat actor entities
* ``/v1/malware`` — malware families / samples
* ``/v1/vulnerabilities`` — CVE / vulnerability records

Investigation CRUD
------------------
Pass ``stix_type="observed-data"`` to the standard CRUD methods to interact
with GreyMatter investigations (cases)::

client.list_objects("observed-data")
client.get_object("observed-data", case_uuid)
client.upsert_object("observed-data", {"title": "APT28 Campaign"})

Use :meth:`link_investigation` to link a STIX observable to an existing case.
"""

from __future__ import annotations
Expand Down Expand Up @@ -67,17 +78,18 @@ class GreyMatterClient(BaseClient, ConnectorMixin):
"""

stix_type_map: dict[str, str] = {
"indicator": "observables",
"threat-actor": "threat-actors",
"malware": "malware",
"vulnerability": "vulnerabilities",
"indicator": "observables",
"threat-actor": "threat-actors",
"malware": "malware",
"vulnerability": "vulnerabilities",
"attack-pattern": "attack-patterns",
"observed-data": "incidents",
}

# GreyMatter observable type → STIX pattern template
_OBS_PATTERN: dict[str, str] = {
"ipv4": "[ipv4-addr:value = '{v}']",
"ipv6": "[ipv6-addr:value = '{v}']",
"ipv4": "[ipv4-addr:value = '{v}']",
"ipv6": "[ipv6-addr:value = '{v}']",
"domain": "[domain-name:value = '{v}']",
"url": "[url:value = '{v}']",
"md5": "[file:hashes.MD5 = '{v}']",
Expand Down Expand Up @@ -160,21 +172,17 @@ def list_objects(
"""
resource = self._resolve(stix_type)
params: dict[str, Any] = {
"limit": page_size,
"limit": page_size,
"offset": (page - 1) * page_size,
}
if filters:
params.update(filters)
resp = self.get(f"/v1/{resource}", params=params)
return resp.get("data", []) if isinstance(resp, dict) else []

def upsert_object(
self,
stix_type: str,
payload: dict[str, Any],
linked_cases: list[str] | None = None,
**kwargs: Any,
) -> dict[str, Any]:
def upsert_object(self, stix_type: str, payload: dict[str, Any],
linked_cases: list[str] | None = None,
**kwargs: Any) -> dict[str, Any]:
"""
Create or update a GreyMatter object.

Expand Down Expand Up @@ -205,16 +213,26 @@ def delete_object(self, stix_type: str, object_id: str) -> None:

def to_stix(self, native: dict[str, Any]) -> dict[str, Any]:
"""
Translate a GreyMatter observable/entity dict to STIX 2.1.
Translate a GreyMatter observable/entity or incident dict to STIX 2.1.

Dispatches to :meth:`_incident_to_stix` when the native record looks
like an investigation/case (detected by ``case_number`` or
``assigned_to`` fields), otherwise maps to a STIX Indicator.

Handles both observable-value records and full entity records.
Parameters
----------
native : dict
Raw GreyMatter API response.
"""
data = native.get("data", native)
gm_type = data.get("type", "")
value = data.get("value", data.get("name", ""))
pattern = self._OBS_PATTERN.get(gm_type, "[unknown:value = '{v}']").format(
v=value.replace("'", "\\'")
)
# Investigations/cases have case_number or assigned_to; observables have type+value
if "case_number" in data or "assigned_to" in data:
return self._incident_to_stix(data)
gm_type = data.get("type", "")
value = data.get("value", data.get("name", ""))
pattern = self._OBS_PATTERN.get(
gm_type, "[unknown:value = '{v}']"
).format(v=value.replace("'", "\\'"))

return {
"type": "indicator",
Expand All @@ -233,6 +251,38 @@ def to_stix(self, native: dict[str, Any]) -> dict[str, Any]:
"x_tlp": data.get("tlp", "white"),
}

@staticmethod
def _incident_to_stix(data: dict[str, Any]) -> dict[str, Any]:
"""
Map a GreyMatter investigation/case record to STIX ``observed-data``.

Parameters
----------
data : dict
GreyMatter incident/case record (with ``case_number``,
``assigned_to``, ``status``, ``severity`` etc.).
"""
created = data.get("created_at", "")
modified = data.get("updated_at", created)
return {
"type": "observed-data",
"id": f"observed-data--{data.get('id', '')}",
"created": created,
"modified": modified,
"first_observed": created,
"last_observed": modified,
"number_observed": 1,
"object_refs": [],
"name": data.get("title", data.get("name", "")),
"description": data.get("description", ""),
"x_gm_case_number": data.get("case_number", ""),
"x_gm_status": data.get("status", ""),
"x_gm_severity": data.get("severity", ""),
"x_gm_assigned_to": data.get("assigned_to", ""),
"x_gm_tags": data.get("tags", []),
"x_tlp": data.get("tlp", "white"),
}

def from_stix(self, stix_dict: dict[str, Any]) -> dict[str, Any]:
"""
Translate a STIX Indicator dict to a GreyMatter observable payload.
Expand Down Expand Up @@ -293,6 +343,65 @@ def link_investigation(
}
return self.post(f"/v1/incidents/{case_id}/linked_observables", json=payload)

# ── Evidence expansion ────────────────────────────────────────────────

def get_investigation_observables(self, case_id: str) -> list[dict[str, Any]]:
"""
Return all observables linked to a GreyMatter investigation/case.

Calls ``GET /v1/incidents/{case_id}/linked_observables``.

Parameters
----------
case_id : str
GreyMatter investigation / case UUID.

Returns
-------
list of dict
Raw GreyMatter observable records.
"""
resp = self.get(f"/v1/incidents/{self._to_gm_id(case_id)}/linked_observables")
return resp.get("data", []) if isinstance(resp, dict) else []

def get_investigation_tasks(self, case_id: str) -> list[dict[str, Any]]:
"""
Return tasks associated with a GreyMatter investigation/case.

Calls ``GET /v1/incidents/{case_id}/tasks``.

Parameters
----------
case_id : str
GreyMatter investigation / case UUID.

Returns
-------
list of dict
Raw GreyMatter task records.
"""
resp = self.get(f"/v1/incidents/{self._to_gm_id(case_id)}/tasks")
return resp.get("data", []) if isinstance(resp, dict) else []

def search_observables_by_value(self, value: str) -> list[dict[str, Any]]:
"""
Search GreyMatter observables by value (IP, domain, hash, email, …).

Calls ``GET /v1/observables?value={value}``.

Parameters
----------
value : str
Observable value to search for.

Returns
-------
list of dict
Raw GreyMatter observable records.
"""
resp = self.get("/v1/observables", params={"value": value, "limit": 50})
return resp.get("data", []) if isinstance(resp, dict) else []

# ── Helpers ────────────────────────────────────────────────────────────

def _resolve(self, stix_type: str) -> str:
Expand All @@ -312,21 +421,21 @@ def _to_gm_id(stix_or_plain_id: str) -> str:
@staticmethod
def _infer_gm_type(pattern: str) -> str:
pattern = pattern.lower()
if "ipv4-addr" in pattern:
if "ipv4-addr" in pattern:
return "ipv4"
if "ipv6-addr" in pattern:
if "ipv6-addr" in pattern:
return "ipv6"
if "domain-name" in pattern:
return "domain"
if "url:" in pattern:
if "url:" in pattern:
return "url"
if "sha-256" in pattern:
if "sha-256" in pattern:
return "sha256"
if "sha-1" in pattern:
if "sha-1" in pattern:
return "sha1"
if "md5" in pattern:
if "md5" in pattern:
return "md5"
if "email-addr" in pattern:
if "email-addr" in pattern:
return "email"
return "unknown"

Expand Down
61 changes: 61 additions & 0 deletions gnat/connectors/servicenow_secops/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,67 @@ def annotate_incident(
)
return resp.get("result", {}) if isinstance(resp, dict) else {}

# ── Investigation sub-API ────────────────────────────────────────────

def get_incident_tasks(self, incident_sys_id: str) -> list[dict[str, Any]]:
"""
Return security tasks linked to a SIR incident.

Queries the ``sn_si_task`` table filtered by parent ``sys_id``.

Parameters
----------
incident_sys_id : str
ServiceNow ``sys_id`` of the parent SIR incident.
"""
resp = self.get(
f"{_TABLE_BASE}/sn_si_task",
params={
"sysparm_query": f"parent={incident_sys_id}",
"sysparm_limit": 200,
},
)
return resp.get("result", []) if isinstance(resp, dict) else []

def get_incident_observables(self, incident_sys_id: str) -> list[dict[str, Any]]:
"""
Return threat intelligence observables linked to a SIR incident.

Queries the ``sn_ti_observable`` table filtered by the incident
reference field.

Parameters
----------
incident_sys_id : str
ServiceNow ``sys_id`` of the parent SIR incident.
"""
resp = self.get(
f"{_TABLE_BASE}/sn_ti_observable",
params={
"sysparm_query": f"incident={incident_sys_id}",
"sysparm_limit": 200,
},
)
return resp.get("result", []) if isinstance(resp, dict) else []

def search_indicators_by_value(self, value: str) -> list[dict[str, Any]]:
"""
Search TIARA observables by value (IP, domain, hash, URL, etc.).

Parameters
----------
value : str
Observable value to search for.
"""
resp = self.get(
f"{_TABLE_BASE}/sn_ti_observable",
params={
"sysparm_query": f"valueLIKE{value}",
"sysparm_limit": 100,
},
)
return resp.get("result", []) if isinstance(resp, dict) else []

# ── Vulnerability Response (VR) helpers ──────────────────────────────

def list_vulnerable_items(
Expand Down
Loading
Loading