From 47feb33b01c08426d719a448007afd477ad4c217 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 19 Apr 2026 01:50:42 +0000 Subject: [PATCH] Add Cuckoo Sandbox / CAPEv2 connector with dual API support MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New gnat/connectors/cuckoo/ connector supporting both Cuckoo 2.x (/api/) and CAPEv2/3.x (/apiv2/) APIs with auto-detection at authenticate() time. Bearer token auth, sandbox_report_envelope() for STIX observed-data, IOC extraction from network/dropped/signatures sections, score-based verdict mapping, and domain helpers for file/URL submission. Platform count: 158 → 159. 22 new tests. https://claude.ai/code/session_01H5UbjsuiiGya5n1eUCxoaR --- CHANGELOG.md | 27 ++ CLAUDE.md | 3 +- README.md | 3 +- config/config.ini.example | 7 + gnat/clients/__init__.py | 2 + gnat/connectors/cuckoo/__init__.py | 14 + gnat/connectors/cuckoo/client.py | 468 +++++++++++++++++++++++ tests/unit/connectors/test_connectors.py | 277 ++++++++++++++ 8 files changed, 799 insertions(+), 2 deletions(-) create mode 100644 gnat/connectors/cuckoo/__init__.py create mode 100644 gnat/connectors/cuckoo/client.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 34091ea..3aca76c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,33 @@ all v1.4+ modules. → Full feature breakdown is in `## [1.4.0]` below; this entry marks the version cut. ## [Unreleased] +### Added — Cuckoo Sandbox / CAPEv2 connector + +New `gnat/connectors/cuckoo/` connector for dynamic malware analysis. +Supports both the legacy Cuckoo 2.x API (`/api/`) and CAPEv2/3.x +(`/apiv2/`) with auto-detection at `authenticate()` time. Platform +count: 158 → 159. + +- `CuckooClient` — Bearer token auth. STIX type map: observed-data, + malware, indicator. Version-specific endpoint routing via + `self._prefix`. Auto-detection probes `/apiv2/cuckoo/status/` first + (CAPEv2 is more common); falls back to v2 on failure. Optional + `api_version` constructor override skips detection. +- Domain helpers: `submit_file()`, `submit_url()`, `get_report()`, + `get_task_view()`, `get_iocs()`, `iocs_to_indicators()`, + `list_machines()`, `get_pcap()`. +- IOC extraction: walks `network.hosts` (IPs), `network.domains` + (domains), `network.http` (URLs), `dropped` (SHA-256 hashes), + `network.dns.answers` (resolved IPs), and CAPEv2 + `signatures[*].marks[*].ioc` (signature-extracted IOCs). + Deduplicates by type+value. +- STIX mapping: `sandbox_report_envelope()` for observed-data with + processes, contacted IPs/domains/URLs, verdict from score mapping + (0-3→clean, 4-6→suspicious, 7+→malicious). Malware SDO from + `malfamily`/`detections` fields. Indicator SDOs via + `make_indicator_pattern()`. +- 22 new tests in `TestCuckooClient`. + ### Added — Sensor/telemetry ingestion module (`gnat[telemetry]`) New `gnat/ingest/telemetry/` package for high-volume honeypot, netflow, diff --git a/CLAUDE.md b/CLAUDE.md index afcbdd7..c70af2c 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -31,7 +31,7 @@ gnat/ # Main Python package ├── orm/ # STIX 2.1 ORM (STIXBase + 8 object types) ├── stix/ # STIX pattern validation (stix2-patterns integration) ├── clients/ # HTTP client layer (urllib3 BaseClient + CLIENT_REGISTRY) -├── connectors/ # 158 platform connectors (ThreatQ, CrowdStrike, Splunk, etc.) +├── connectors/ # 159 platform connectors (ThreatQ, CrowdStrike, Splunk, etc.) ├── ingest/ # Multi-source ingestion pipeline (14 readers, 12 mappers) │ └── telemetry/ # High-volume sensor ingestion (Kafka reader, Redis dedup, campaign auto-link) ├── export/ # Export pipeline (EDL, Netskope CE delivery targets) @@ -352,6 +352,7 @@ Prefer mocking at the HTTP layer (`mock_pool_manager`) rather than patching indi | Hybrid Analysis / Falcon Sandbox | `gnat/connectors/hybrid_analysis/` | API key + User-Agent header | | VMRay (hypervisor-level analysis) | `gnat/connectors/vmray/` | API key (`api_key` header) | | Intezer Analyze (binary DNA attribution) | `gnat/connectors/intezer/` | API key → JWT Bearer | +| Cuckoo Sandbox / CAPEv2 (dynamic malware analysis) | `gnat/connectors/cuckoo/` | Bearer token | | Huntress Managed EDR / ITDR | `gnat/connectors/huntress/` | HTTP Basic (key id + secret) | | Arctic Wolf MDR | `gnat/connectors/arctic_wolf/` | Bearer token (+ optional customer id) | | Red Canary MDR | `gnat/connectors/red_canary/` | API key (`X-Api-Key` header) | diff --git a/README.md b/README.md index ff3c310..434c79f 100644 --- a/README.md +++ b/README.md @@ -52,7 +52,7 @@ GNAT provides a single, consistent abstraction layer over 158 platforms — thre | Layer | What it does | |-------|-------------| -| **158 Connectors** | Uniform CRUD + bidirectional STIX 2.1 translation for every supported platform | +| **159 Connectors** | Uniform CRUD + bidirectional STIX 2.1 translation for every supported platform | | **STIX 2.1 ORM** | Indicator, ThreatActor, Vulnerability, Malware, AttackPattern, Relationship, Observables | | **Ingest Pipelines** | 15 source readers × 13 mappers; pull from any platform, file, feed, database, or Kafka topic | | **Export Pipelines** | EDL files, Netskope CE, STIX bundles, CSV; configurable filters + transforms + delivery | @@ -245,6 +245,7 @@ GNAT provides a single, consistent abstraction layer over 158 platforms — thre | `hybrid_analysis` | Hybrid Analysis / Falcon Sandbox | API key + User-Agent header | | `vmray` | VMRay (hypervisor-level analysis) | API key (`api_key` header) | | `intezer` | Intezer Analyze (binary DNA attribution) | API key → JWT Bearer | +| `cuckoo` | Cuckoo Sandbox / CAPEv2 (dynamic malware analysis) | Bearer token | ### Managed Detection & Response (MDR) diff --git a/config/config.ini.example b/config/config.ini.example index 895e2e2..29d2b60 100644 --- a/config/config.ini.example +++ b/config/config.ini.example @@ -1232,6 +1232,13 @@ api_key = YOUR_HTTPBL_ACCESS_KEY [crtsh] host = https://crt.sh +# --- Cuckoo Sandbox / CAPEv2 (dynamic malware analysis) -------------------- +# Supports both Cuckoo 2.x and CAPEv2/3.x APIs (auto-detected). +[cuckoo] +host = https://cuckoo.lab.internal +api_key = +# api_version = 3 ; optional — auto-detected if omitted + # --- Google Certificate Transparency log API ------------------------------- # No authentication required. ``log`` is the path to a specific CT log # operated by Google (Argon, Xenon, etc.). diff --git a/gnat/clients/__init__.py b/gnat/clients/__init__.py index ba7e3d1..bee86ee 100644 --- a/gnat/clients/__init__.py +++ b/gnat/clients/__init__.py @@ -42,6 +42,7 @@ from gnat.connectors.cribl.client import CriblClient from gnat.connectors.crowdstrike.client import CrowdStrikeClient from gnat.connectors.crtsh.client import CrtShClient +from gnat.connectors.cuckoo.client import CuckooClient from gnat.connectors.cyble_vision.client import CybleVisionClient from gnat.connectors.cycognito.client import CyCognitoClient from gnat.connectors.cymulate.client import CymulateClient @@ -339,6 +340,7 @@ "project_honey_pot": ProjectHoneyPotClient, # Phase 2 Wave 9 — Cert transparency + DFIR + bug bounty "crtsh": CrtShClient, + "cuckoo": CuckooClient, "google_ct": GoogleCTClient, "velociraptor": VelociraptorClient, "magnet_axiom": MagnetAxiomClient, diff --git a/gnat/connectors/cuckoo/__init__.py b/gnat/connectors/cuckoo/__init__.py new file mode 100644 index 0000000..d86f6a9 --- /dev/null +++ b/gnat/connectors/cuckoo/__init__.py @@ -0,0 +1,14 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright 2026 Bill Halpin +""" +gnat.connectors.cuckoo +========================= + +Cuckoo Sandbox / CAPEv2 connector — automated malware analysis with +behavioral reports, IOC extraction, and STIX indicator generation. +Supports both Cuckoo 2.x and CAPEv2/3.x APIs with auto-detection. +""" + +from .client import CuckooClient + +__all__ = ["CuckooClient"] diff --git a/gnat/connectors/cuckoo/client.py b/gnat/connectors/cuckoo/client.py new file mode 100644 index 0000000..d56f841 --- /dev/null +++ b/gnat/connectors/cuckoo/client.py @@ -0,0 +1,468 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright 2026 Bill Halpin +""" +gnat.connectors.cuckoo.client +================================= + +Cuckoo Sandbox / CAPEv2 connector — automated malware analysis. + +Supports both the legacy Cuckoo 2.x API (``/api/``) and the +CAPEv2/Cuckoo 3.x API (``/apiv2/``). API version is auto-detected +at :meth:`authenticate` time unless explicitly overridden. + +Authentication +-------------- +Bearer token:: + + [cuckoo] + host = https://cuckoo.lab.internal + api_key = + # api_version = 3 ; optional — auto-detected if omitted + +Key endpoints (v2 → v3) +----------------------- +* ``GET /cuckoo/status`` → ``GET /apiv2/cuckoo/status/`` +* ``POST /tasks/create/file`` → ``POST /apiv2/tasks/create/file/`` +* ``POST /tasks/create/url`` → ``POST /apiv2/tasks/create/url/`` +* ``GET /tasks/list`` → ``GET /apiv2/tasks/list/`` +* ``GET /tasks/view/`` → ``GET /apiv2/tasks/view//`` +* ``GET /tasks/report/`` → ``GET /apiv2/tasks/report//`` + +STIX Type Mapping +----------------- +``observed-data`` wraps the full behavioral report via +:func:`sandbox_report_envelope`; ``malware`` carries verdict/family; +``indicator`` is emitted per extracted IOC. +""" + +from __future__ import annotations + +import uuid +from typing import Any + +from gnat.clients.base import BaseClient, GNATClientError +from gnat.connectors.base_connector import ConnectorMixin +from gnat.stix.version import CURRENT_SPEC_VERSION +from gnat.utils.stix_helpers import ( + make_indicator_pattern, + sandbox_report_envelope, + utcnow, +) + +_NAMESPACE_CUCKOO = uuid.UUID("c0cc00c0-0001-4c0c-b0c0-c0cc00c0c0fe") + + +def _score_to_verdict(score: Any) -> str: + try: + s = float(score) + except (TypeError, ValueError): + return "unknown" + if s <= 3: + return "clean" + if s <= 6: + return "suspicious" + return "malicious" + + +def _cuckoo_malware_types(native: dict[str, Any]) -> list[str]: + sigs = native.get("signatures") or [] + tags: list[str] = [] + for sig in sigs: + name = (sig.get("name") or "").lower() if isinstance(sig, dict) else "" + tags.append(name) + out: list[str] = [] + combined = " ".join(tags) + if "trojan" in combined: + out.append("trojan") + if "ransom" in combined: + out.append("ransomware") + if "backdoor" in combined: + out.append("backdoor") + if "worm" in combined: + out.append("worm") + if "rootkit" in combined: + out.append("rootkit") + if "rat" in combined: + out.append("remote-access-trojan") + return out or ["unknown"] + + +def _extract_cuckoo_list(resp: Any) -> list[dict[str, Any]]: + if isinstance(resp, list): + return [r for r in resp if isinstance(r, dict)] + if not isinstance(resp, dict): + return [] + for key in ("data", "tasks", "results"): + val = resp.get(key) + if isinstance(val, list): + return [r for r in val if isinstance(r, dict)] + return [] + + +class CuckooClient(BaseClient, ConnectorMixin): + """ + HTTP client for Cuckoo Sandbox (2.x) and CAPEv2 (3.x). + + Parameters + ---------- + host : str + Base URL of the Cuckoo/CAPEv2 instance. + api_key : str + API key for Bearer authentication. + api_version : str, optional + ``"2"`` for Cuckoo 2.x, ``"3"`` for CAPEv2/3.x. + Auto-detected if omitted. + """ + + TRUST_LEVEL: str = "semi_trusted" + COST_UNIT: int = 5 + + stix_type_map: dict[str, str] = { + "observed-data": "tasks", + "malware": "tasks", + "indicator": "tasks", + } + + def __init__( + self, + host: str = "", + api_key: str = "", + api_version: str | None = None, + **kwargs: Any, + ) -> None: + super().__init__(host=host, **kwargs) + self.api_key = api_key + self._api_version = api_version + self._prefix = "/apiv2" if api_version == "3" else "/api" + + # ── Authentication ───────────────────────────────────────────────────── + + def authenticate(self) -> None: + if not self.api_key: + raise GNATClientError( + "Cuckoo connector requires api_key in config." + ) + self._auth_headers["Authorization"] = f"Bearer {self.api_key}" + + if self._api_version is None: + self._detect_version() + else: + self._prefix = "/apiv2" if self._api_version == "3" else "/api" + + def _detect_version(self) -> None: + try: + self.get("/apiv2/cuckoo/status/") + self._api_version = "3" + self._prefix = "/apiv2" + except Exception: # noqa: BLE001 + self._api_version = "2" + self._prefix = "/api" + + # ── ConnectorMixin — CRUD ────────────────────────────────────────────── + + def health_check(self) -> bool: + try: + self.get(f"{self._prefix}/cuckoo/status{'/' if self._api_version == '3' else ''}") + return True + except Exception: # noqa: BLE001 + return False + + def get_object(self, stix_type: str, object_id: str) -> dict[str, Any]: + if not object_id: + raise GNATClientError("Cuckoo get_object requires a non-empty id") + if stix_type not in ("observed-data", "malware", "indicator"): + raise GNATClientError( + f"Cuckoo get_object does not support stix_type={stix_type!r}" + ) + trail = "/" if self._api_version == "3" else "" + resp = self.get(f"{self._prefix}/tasks/report/{object_id}{trail}") + if not isinstance(resp, dict): + raise GNATClientError( + f"Cuckoo returned unexpected payload for task {object_id!r}" + ) + return dict(resp, _cuckoo_kind=stix_type, _cuckoo_task_id=str(object_id)) + + def list_objects( + self, + stix_type: str, + filters: dict[str, Any] | None = None, + page: int = 1, + page_size: int = 100, + ) -> list[dict[str, Any]]: + if stix_type not in ("observed-data", "malware", "indicator"): + raise GNATClientError( + f"Cuckoo list_objects does not support stix_type={stix_type!r}" + ) + trail = "/" if self._api_version == "3" else "" + limit = int(page_size) + offset = max(0, (int(page) - 1) * limit) + resp = self.get( + f"{self._prefix}/tasks/list/{limit}/{offset}{trail}" + ) + items = _extract_cuckoo_list(resp) + return [dict(r, _cuckoo_kind=stix_type) for r in items] + + def upsert_object( + self, stix_type: str, payload: dict[str, Any] + ) -> dict[str, Any]: + raise GNATClientError( + "Cuckoo connector is read-only via CRUD — use submit_file / " + "submit_url domain helpers to trigger analyses." + ) + + def delete_object(self, stix_type: str, object_id: str) -> None: + if self._api_version == "3": + self.get(f"{self._prefix}/tasks/delete/{object_id}/") + else: + raise GNATClientError( + "Cuckoo 2.x API does not support task deletion." + ) + + # ── Domain-specific helpers ──────────────────────────────────────────── + + def submit_file( + self, filepath: str, **opts: Any + ) -> dict[str, Any]: + import os + + if not os.path.isfile(filepath): + raise GNATClientError(f"submit_file: {filepath!r} does not exist") + with open(filepath, "rb") as fh: + files = {"file": (os.path.basename(filepath), fh.read())} + trail = "/" if self._api_version == "3" else "" + data: dict[str, Any] = {} + if opts.get("machine"): + data["machine"] = opts["machine"] + if opts.get("package"): + data["package"] = opts["package"] + if opts.get("timeout"): + data["timeout"] = str(opts["timeout"]) + return self.post( + f"{self._prefix}/tasks/create/file{trail}", + data=data or None, + files=files, + ) + + def submit_url(self, url: str, **opts: Any) -> dict[str, Any]: + trail = "/" if self._api_version == "3" else "" + data: dict[str, Any] = {"url": url} + if opts.get("machine"): + data["machine"] = opts["machine"] + if opts.get("package"): + data["package"] = opts["package"] + return self.post( + f"{self._prefix}/tasks/create/url{trail}", data=data + ) + + def get_report(self, task_id: str) -> dict[str, Any]: + return self.get_object("observed-data", task_id) + + def get_task_view(self, task_id: str) -> dict[str, Any]: + trail = "/" if self._api_version == "3" else "" + return self.get(f"{self._prefix}/tasks/view/{task_id}{trail}") + + def list_machines(self) -> list[dict[str, Any]]: + trail = "/" if self._api_version == "3" else "" + resp = self.get(f"{self._prefix}/machines/list{trail}") + return _extract_cuckoo_list(resp) + + def get_pcap(self, task_id: str) -> Any: + if self._api_version == "3": + return self.get(f"{self._prefix}/tasks/pcap/{task_id}/") + return self.get(f"{self._prefix}/pcap/get/{task_id}") + + def get_iocs(self, task_id: str) -> list[dict[str, Any]]: + report = self.get_object("observed-data", task_id) + return _extract_iocs(report) + + def iocs_to_indicators(self, task_id: str) -> list[dict[str, Any]]: + iocs = self.get_iocs(task_id) + return [ + dict(ioc, _cuckoo_kind="indicator", _cuckoo_task_id=task_id) + for ioc in iocs + ] + + # ── ConnectorMixin — STIX translation ────────────────────────────────── + + def to_stix(self, native: dict[str, Any]) -> dict[str, Any]: + if not isinstance(native, dict): + raise GNATClientError("Cuckoo to_stix expects a dict input") + + kind = native.get("_cuckoo_kind") or "observed-data" + task_id = str(native.get("_cuckoo_task_id") or native.get("id", "")) + + if kind == "indicator": + return self._to_stix_indicator(native) + + if kind == "malware": + return self._to_stix_malware(native, task_id) + + return self._to_stix_observed_data(native, task_id) + + def _to_stix_indicator(self, native: dict[str, Any]) -> dict[str, Any]: + ioc_type = (native.get("type") or "").lower() + value = native.get("value") or "" + + if ioc_type in ("ip", "ipv4", "ipv4-addr"): + pattern = make_indicator_pattern("ipv4-addr", value) + elif ioc_type in ("domain", "domain-name"): + pattern = make_indicator_pattern("domain-name", value) + elif ioc_type == "url": + pattern = make_indicator_pattern("url", value) + elif ioc_type in ("sha256", "sha1", "md5"): + pattern = make_indicator_pattern(f"file:{ioc_type}", value) + else: + escaped = value.replace("'", "\\'") + pattern = f"[x-cuckoo:value = '{escaped}']" + + stix_uuid = uuid.uuid5(_NAMESPACE_CUCKOO, f"indicator|{value}") + return { + "type": "indicator", + "id": f"indicator--{stix_uuid}", + "spec_version": CURRENT_SPEC_VERSION, + "created": utcnow(), + "modified": utcnow(), + "pattern": pattern, + "pattern_type": "stix", + "valid_from": utcnow(), + "name": f"Cuckoo: {value}", + "description": "Cuckoo Sandbox extracted IOC", + "labels": ["malicious-activity"], + "x_cuckoo": { + "task_id": native.get("_cuckoo_task_id"), + "ioc_type": ioc_type, + }, + } + + def _to_stix_malware( + self, native: dict[str, Any], task_id: str + ) -> dict[str, Any]: + info = native.get("info") or {} + target = native.get("target") or {} + target_file = target.get("file") or {} + + score = info.get("score") or native.get("score") + verdict = _score_to_verdict(score) + family = ( + native.get("malfamily") + or native.get("detections") + or target_file.get("name") + or "unknown" + ) + if isinstance(family, list): + family = family[0] if family else "unknown" + + stix_uuid = uuid.uuid5(_NAMESPACE_CUCKOO, f"malware|{task_id}|{family}") + return { + "type": "malware", + "id": f"malware--{stix_uuid}", + "spec_version": CURRENT_SPEC_VERSION, + "created": utcnow(), + "modified": utcnow(), + "name": str(family), + "is_family": True, + "description": f"Cuckoo Sandbox: {verdict} ({family})", + "malware_types": _cuckoo_malware_types(native), + "x_cuckoo": { + "task_id": task_id, + "score": score, + "verdict": verdict, + }, + } + + def _to_stix_observed_data( + self, native: dict[str, Any], task_id: str + ) -> dict[str, Any]: + info = native.get("info") or {} + target = native.get("target") or {} + target_file = target.get("file") or {} + network = native.get("network") or {} + behavior = native.get("behavior") or {} + + hosts = [ + h if isinstance(h, str) else h.get("ip", "") + for h in (network.get("hosts") or []) + ] + domains = [ + d.get("domain") if isinstance(d, dict) else str(d) + for d in (network.get("domains") or []) + ] + urls = [ + h.get("uri") if isinstance(h, dict) else str(h) + for h in (network.get("http") or []) + ] + processes = [ + p.get("process_name") if isinstance(p, dict) else str(p) + for p in (behavior.get("processes") or []) + ] + return sandbox_report_envelope( + source_name="cuckoo", + analysis_id=task_id, + submitted_sha256=target_file.get("sha256", ""), + submitted_filename=target_file.get("name", ""), + processes=[p for p in processes if p], + contacted_ips=[ip for ip in hosts if ip], + contacted_domains=[d for d in domains if d], + contacted_urls=[u for u in urls if u], + first_observed=info.get("started") or "", + last_observed=info.get("ended") or "", + verdict=_score_to_verdict(info.get("score")), + score=info.get("score"), + raw_report=native, + ) + + def from_stix(self, stix_dict: dict[str, Any]) -> dict[str, Any]: + return { + "note": ( + "Cuckoo connector is read-only via CRUD. Use submit_file, " + "submit_url, get_report, get_iocs, or iocs_to_indicators " + "to interact with the platform." + ), + "stix_id": stix_dict.get("id", ""), + } + + +def _extract_iocs(report: dict[str, Any]) -> list[dict[str, Any]]: + iocs: list[dict[str, Any]] = [] + network = report.get("network") or {} + + for host in network.get("hosts") or []: + ip = host if isinstance(host, str) else (host.get("ip") or "") + if ip: + iocs.append({"type": "ipv4", "value": ip}) + + for entry in network.get("domains") or []: + domain = entry.get("domain") if isinstance(entry, dict) else str(entry) + if domain: + iocs.append({"type": "domain", "value": domain}) + + for entry in network.get("http") or []: + uri = entry.get("uri") if isinstance(entry, dict) else str(entry) + if uri: + iocs.append({"type": "url", "value": uri}) + + for entry in report.get("dropped") or []: + if isinstance(entry, dict) and entry.get("sha256"): + iocs.append({"type": "sha256", "value": entry["sha256"]}) + + for entry in network.get("dns") or []: + if isinstance(entry, dict): + for answer in entry.get("answers") or []: + data = answer.get("data") if isinstance(answer, dict) else "" + if data: + iocs.append({"type": "ipv4", "value": data}) + + for sig in report.get("signatures") or []: + if isinstance(sig, dict): + for mark in sig.get("marks") or []: + if isinstance(mark, dict) and mark.get("ioc"): + iocs.append({"type": "unknown", "value": mark["ioc"]}) + + seen: set[str] = set() + deduped: list[dict[str, Any]] = [] + for ioc in iocs: + key = f"{ioc['type']}|{ioc['value']}" + if key not in seen: + seen.add(key) + deduped.append(ioc) + return deduped diff --git a/tests/unit/connectors/test_connectors.py b/tests/unit/connectors/test_connectors.py index 9957472..f9b2b74 100644 --- a/tests/unit/connectors/test_connectors.py +++ b/tests/unit/connectors/test_connectors.py @@ -20466,3 +20466,280 @@ def test_phase2_wave9_config_sections_exist(): "bugcrowd", ): assert parser.has_section(section), f"Missing [{section}] in config.ini.example" + + +# --------------------------------------------------------------------------- +# Cuckoo Sandbox / CAPEv2 +# --------------------------------------------------------------------------- + + +class TestCuckooClient: + @pytest.fixture + def client_v3(self): + from gnat.connectors.cuckoo.client import CuckooClient + + c = CuckooClient( + host="https://cuckoo.lab.internal", + api_key="ck_test", + api_version="3", + ) + c._authenticated = True + return c + + @pytest.fixture + def client_v2(self): + from gnat.connectors.cuckoo.client import CuckooClient + + c = CuckooClient( + host="https://cuckoo.lab.internal", + api_key="ck_test", + api_version="2", + ) + c._authenticated = True + return c + + def test_authenticate_sets_bearer(self): + from gnat.connectors.cuckoo.client import CuckooClient + + c = CuckooClient( + host="https://cuckoo.lab.internal", + api_key="ck_test", + api_version="3", + ) + c.authenticate() + assert c._auth_headers["Authorization"] == "Bearer ck_test" + + def test_authenticate_requires_api_key(self): + from gnat.connectors.cuckoo.client import CuckooClient + + c = CuckooClient(host="https://cuckoo.lab.internal", api_key="") + with pytest.raises(GNATClientError, match="requires api_key"): + c.authenticate() + + def test_authenticate_v3_autodetect(self): + from gnat.connectors.cuckoo.client import CuckooClient + + c = CuckooClient( + host="https://cuckoo.lab.internal", api_key="ck_test" + ) + c._auth_headers["Authorization"] = "Bearer ck_test" + c.get = MagicMock(return_value={"version": "CAPEv2"}) + c._detect_version() + assert c._api_version == "3" + assert c._prefix == "/apiv2" + + def test_authenticate_v2_fallback(self): + from gnat.connectors.cuckoo.client import CuckooClient + + c = CuckooClient( + host="https://cuckoo.lab.internal", api_key="ck_test" + ) + c._auth_headers["Authorization"] = "Bearer ck_test" + + def _fail_then_ok(url, *a, **kw): + if "/apiv2/" in url: + raise RuntimeError("404") + return {"version": "2.0.7"} + + c.get = MagicMock(side_effect=_fail_then_ok) + c._detect_version() + assert c._api_version == "2" + assert c._prefix == "/api" + + def test_authenticate_explicit_version(self): + from gnat.connectors.cuckoo.client import CuckooClient + + c = CuckooClient( + host="https://cuckoo.lab.internal", + api_key="ck_test", + api_version="2", + ) + c.authenticate() + assert c._api_version == "2" + assert c._prefix == "/api" + + def test_health_check_true(self, client_v3, monkeypatch): + monkeypatch.setattr( + client_v3, "get", MagicMock(return_value={"hostname": "cuckoo"}) + ) + assert client_v3.health_check() is True + + def test_health_check_false(self, client_v3, monkeypatch): + def _boom(*a, **kw): + raise RuntimeError("nope") + + monkeypatch.setattr(client_v3, "get", _boom) + assert client_v3.health_check() is False + + def test_list_objects_v3(self, client_v3, monkeypatch): + monkeypatch.setattr( + client_v3, + "get", + MagicMock(return_value={"data": [{"id": 1}, {"id": 2}]}), + ) + items = client_v3.list_objects("observed-data") + assert len(items) == 2 + assert items[0]["_cuckoo_kind"] == "observed-data" + + def test_get_report_v3(self, client_v3, monkeypatch): + report = { + "info": {"id": 42, "score": 8.5}, + "target": {"file": {"sha256": "abc", "name": "sample.exe"}}, + "network": {"hosts": ["1.2.3.4"]}, + } + monkeypatch.setattr( + client_v3, "get", MagicMock(return_value=report) + ) + obj = client_v3.get_object("observed-data", "42") + assert obj["_cuckoo_task_id"] == "42" + assert obj["_cuckoo_kind"] == "observed-data" + + def test_submit_file_v2(self, client_v2, monkeypatch, tmp_path): + f = tmp_path / "malware.exe" + f.write_bytes(b"MZ\x90\x00") + monkeypatch.setattr( + client_v2, "post", MagicMock(return_value={"task_id": 99}) + ) + result = client_v2.submit_file(str(f)) + assert result["task_id"] == 99 + call_url = client_v2.post.call_args[0][0] + assert "/api/tasks/create/file" in call_url + assert "/apiv2/" not in call_url + + def test_submit_file_v3(self, client_v3, monkeypatch, tmp_path): + f = tmp_path / "malware.exe" + f.write_bytes(b"MZ\x90\x00") + monkeypatch.setattr( + client_v3, "post", MagicMock(return_value={"task_id": 99}) + ) + client_v3.submit_file(str(f)) + call_url = client_v3.post.call_args[0][0] + assert "/apiv2/tasks/create/file/" in call_url + + def test_submit_url(self, client_v3, monkeypatch): + monkeypatch.setattr( + client_v3, "post", MagicMock(return_value={"task_id": 100}) + ) + result = client_v3.submit_url("http://evil.com/payload") + assert result["task_id"] == 100 + + def test_upsert_raises(self, client_v3): + with pytest.raises(GNATClientError, match="read-only via CRUD"): + client_v3.upsert_object("observed-data", {}) + + def test_to_stix_observed_data(self, client_v3): + stix = client_v3.to_stix( + { + "_cuckoo_task_id": "42", + "info": {"id": 42, "score": 8.5, "started": "2026-04-01T00:00:00Z"}, + "target": {"file": {"sha256": "abc123", "name": "sample.exe"}}, + "network": { + "hosts": ["1.2.3.4"], + "domains": [{"domain": "evil.example"}], + "http": [{"uri": "http://evil.example/c2"}], + }, + "behavior": {"processes": [{"process_name": "sample.exe"}]}, + "dropped": [{"sha256": "def456"}], + } + ) + _assert_stix_contract(stix) + assert stix["type"] == "observed-data" + + def test_to_stix_malware(self, client_v3): + stix = client_v3.to_stix( + { + "_cuckoo_kind": "malware", + "_cuckoo_task_id": "42", + "info": {"score": 9.0}, + "target": {"file": {"name": "emotet.exe"}}, + "malfamily": "Emotet", + "signatures": [{"name": "trojan_emotet"}], + } + ) + _assert_stix_contract(stix) + assert stix["type"] == "malware" + assert stix["name"] == "Emotet" + assert "trojan" in stix["malware_types"] + + def test_to_stix_indicator(self, client_v3): + stix = client_v3.to_stix( + { + "_cuckoo_kind": "indicator", + "_cuckoo_task_id": "42", + "type": "domain", + "value": "evil.example", + } + ) + _assert_stix_contract(stix) + assert "domain-name:value" in stix["pattern"] + + def test_ioc_extraction(self, client_v3, monkeypatch): + report = { + "_cuckoo_kind": "observed-data", + "_cuckoo_task_id": "42", + "network": { + "hosts": ["1.2.3.4", "5.6.7.8"], + "domains": [{"domain": "evil.com"}], + "http": [{"uri": "http://evil.com/c2"}], + "dns": [{"answers": [{"data": "9.9.9.9"}]}], + }, + "dropped": [{"sha256": "aabbcc"}], + "signatures": [{"marks": [{"ioc": "suspicious.dll"}]}], + "info": {}, + "target": {}, + } + monkeypatch.setattr( + client_v3, "get", MagicMock(return_value=report) + ) + iocs = client_v3.get_iocs("42") + types = {i["type"] for i in iocs} + assert "ipv4" in types + assert "domain" in types + assert "url" in types + assert "sha256" in types + values = {i["value"] for i in iocs} + assert "1.2.3.4" in values + assert "evil.com" in values + assert "aabbcc" in values + assert "9.9.9.9" in values + + def test_ioc_extraction_deduplicates(self, client_v3, monkeypatch): + report = { + "_cuckoo_kind": "observed-data", + "_cuckoo_task_id": "42", + "network": { + "hosts": ["1.2.3.4", "1.2.3.4"], + "domains": [], + "http": [], + }, + "dropped": [], + "signatures": [], + "info": {}, + "target": {}, + } + monkeypatch.setattr( + client_v3, "get", MagicMock(return_value=report) + ) + iocs = client_v3.get_iocs("42") + assert sum(1 for i in iocs if i["value"] == "1.2.3.4") == 1 + + def test_score_to_verdict(self): + from gnat.connectors.cuckoo.client import _score_to_verdict + + assert _score_to_verdict(2) == "clean" + assert _score_to_verdict(5) == "suspicious" + assert _score_to_verdict(8) == "malicious" + assert _score_to_verdict(None) == "unknown" + assert _score_to_verdict("bad") == "unknown" + + def test_from_stix_is_noop(self, client_v3): + out = client_v3.from_stix({"id": "observed-data--x"}) + assert "read-only" in out["note"] + + def test_cost_unit_is_high(self, client_v3): + assert client_v3.COST_UNIT >= 5 + + def test_stix_type_map_keys(self, client_v3): + assert "observed-data" in client_v3.stix_type_map + assert "malware" in client_v3.stix_type_map + assert "indicator" in client_v3.stix_type_map