diff --git a/gnat/analysis/attribution/builder.py b/gnat/analysis/attribution/builder.py new file mode 100644 index 0000000..febad19 --- /dev/null +++ b/gnat/analysis/attribution/builder.py @@ -0,0 +1,148 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright 2026 Bill Halpin +""" +gnat.analysis.attribution.builder +===================================== + +Campaign builder — promotes :class:`~gnat.analysis.correlation.cluster_detector.Cluster` +objects into :class:`CampaignProfile` instances and wires up indicator, +actor, and investigation linkage. + +The builder is the bridge between the automated correlation layer +(which produces clusters) and the analyst-managed campaign layer +(which tracks lifecycle, attribution hypotheses, and kill-chain +progression). It is explicitly *not* automatic: clusters must be +promoted by an analyst or by a rule-triggered workflow. +""" + +from __future__ import annotations + +import logging +from typing import Any + +from gnat.analysis.attribution.models import CampaignProfile, CampaignStatus + +logger = logging.getLogger(__name__) + + +class CampaignBuilder: + """ + Converts correlation clusters into campaign profiles. + + Stateless — takes cluster data as dicts (matching + :meth:`Cluster.to_dict`) and returns :class:`CampaignProfile` + instances ready for persistence via :class:`CampaignService`. + """ + + @staticmethod + def from_cluster( + cluster: dict[str, Any], + *, + created_by: str = "cluster_detector", + classification: str = "amber", + ) -> CampaignProfile: + """ + Build a :class:`CampaignProfile` from a single cluster dict. + + Uses the cluster's ``suggested_campaign`` as the campaign name + (falls back to the cluster label), and ``suggested_actor`` as + the initial ``threat_actor_id``. Member IDs become the campaign's + ``indicator_ids`` and the cluster ID is linked as a + ``cluster_id``. + + Parameters + ---------- + cluster : dict + Output of ``Cluster.to_dict()``. + created_by : str + Principal that triggered the promotion. + classification : str + TLP classification for the new campaign. + """ + name = ( + cluster.get("suggested_campaign") + or cluster.get("label") + or f"Cluster {cluster.get('id', 'unknown')}" + ) + + actor_id = None + actor_label = cluster.get("suggested_actor") + if actor_label: + actor_id = f"threat-actor--{actor_label}" + + campaign = CampaignProfile( + name=name, + description=f"Auto-promoted from cluster {cluster.get('id', '')}. " + f"Signals: {'; '.join(cluster.get('signals') or [])}", + status=CampaignStatus.SUSPECTED, + threat_actor_id=actor_id, + indicator_ids=list(cluster.get("member_ids") or []), + cluster_ids=[cluster.get("id", "")], + tags=["auto-promoted", "from-cluster"], + classification=classification, + created_by=created_by, + ) + + logger.info( + "CampaignBuilder: promoted cluster %s → campaign %s (%s)", + cluster.get("id"), + campaign.id, + name, + ) + return campaign + + @staticmethod + def from_clusters( + clusters: list[dict[str, Any]], + *, + created_by: str = "cluster_detector", + min_confidence: int = 0, + ) -> list[CampaignProfile]: + """ + Batch-promote multiple clusters, optionally filtering by + minimum STIX confidence. + + Parameters + ---------- + clusters : list of dict + Each dict is the output of ``Cluster.to_dict()``. + min_confidence : int + Skip clusters whose ``confidence.stix_confidence`` is below + this threshold. Default 0 (promote all). + """ + campaigns: list[CampaignProfile] = [] + for cluster in clusters: + conf = cluster.get("confidence") or {} + stix_conf = conf.get("stix_confidence", 0) if isinstance(conf, dict) else 0 + if stix_conf < min_confidence: + logger.debug( + "CampaignBuilder: skipping cluster %s (confidence %d < %d)", + cluster.get("id"), + stix_conf, + min_confidence, + ) + continue + campaigns.append( + CampaignBuilder.from_cluster(cluster, created_by=created_by) + ) + return campaigns + + @staticmethod + def merge_into_existing( + campaign: CampaignProfile, + cluster: dict[str, Any], + ) -> CampaignProfile: + """ + Merge a cluster's indicators into an existing campaign. + + Adds the cluster's ``member_ids`` to the campaign's + ``indicator_ids`` (deduplicated) and links the cluster ID. + Does NOT change campaign status or attribution. + """ + for mid in cluster.get("member_ids") or []: + if mid not in campaign.indicator_ids: + campaign.indicator_ids.append(mid) + cluster_id = cluster.get("id", "") + if cluster_id and cluster_id not in campaign.cluster_ids: + campaign.cluster_ids.append(cluster_id) + return campaign diff --git a/tests/unit/analysis/test_campaign_builder.py b/tests/unit/analysis/test_campaign_builder.py new file mode 100644 index 0000000..0938d01 --- /dev/null +++ b/tests/unit/analysis/test_campaign_builder.py @@ -0,0 +1,161 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright 2026 Bill Halpin +""" +tests/unit/analysis/test_campaign_builder.py +=============================================== + +Unit tests for the CampaignBuilder — cluster-to-campaign promotion +(Phase 4 of the attribution & campaign tracking extension). +""" + +from __future__ import annotations + +from gnat.analysis.attribution.builder import CampaignBuilder +from gnat.analysis.attribution.models import CampaignProfile, CampaignStatus + + +def _make_cluster( + cluster_id: str = "cluster-1", + label: str = "Subnet overlap cluster", + member_ids: list[str] | None = None, + signals: list[str] | None = None, + suggested_campaign: str | None = None, + suggested_actor: str | None = None, + stix_confidence: int = 60, +) -> dict: + return { + "id": cluster_id, + "label": label, + "member_ids": member_ids or ["ioc-1", "ioc-2", "ioc-3"], + "signals": signals or ["subnet_overlap", "timing_correlation"], + "confidence": { + "source_reliability": "C", + "information_credibility": 3, + "stix_confidence": stix_confidence, + "band": "MEDIUM", + "label": "C3 (MEDIUM)", + "rationale": None, + }, + "suggested_actor": suggested_actor, + "suggested_campaign": suggested_campaign, + "size": 3, + } + + +class TestCampaignBuilderFromCluster: + def test_basic_promotion(self): + cluster = _make_cluster() + campaign = CampaignBuilder.from_cluster(cluster) + assert isinstance(campaign, CampaignProfile) + assert campaign.status == CampaignStatus.SUSPECTED + assert campaign.indicator_ids == ["ioc-1", "ioc-2", "ioc-3"] + assert "cluster-1" in campaign.cluster_ids + assert "auto-promoted" in campaign.tags + + def test_uses_suggested_campaign_as_name(self): + cluster = _make_cluster(suggested_campaign="Operation Sunrise") + campaign = CampaignBuilder.from_cluster(cluster) + assert campaign.name == "Operation Sunrise" + + def test_falls_back_to_label(self): + cluster = _make_cluster(label="Infrastructure overlap") + campaign = CampaignBuilder.from_cluster(cluster) + assert campaign.name == "Infrastructure overlap" + + def test_links_suggested_actor(self): + cluster = _make_cluster(suggested_actor="APT28") + campaign = CampaignBuilder.from_cluster(cluster) + assert campaign.threat_actor_id == "threat-actor--APT28" + + def test_no_actor_when_not_suggested(self): + cluster = _make_cluster(suggested_actor=None) + campaign = CampaignBuilder.from_cluster(cluster) + assert campaign.threat_actor_id is None + + def test_description_includes_signals(self): + cluster = _make_cluster(signals=["subnet_overlap", "tag_match"]) + campaign = CampaignBuilder.from_cluster(cluster) + assert "subnet_overlap" in campaign.description + assert "tag_match" in campaign.description + + def test_created_by_default(self): + cluster = _make_cluster() + campaign = CampaignBuilder.from_cluster(cluster) + assert campaign.created_by == "cluster_detector" + + def test_created_by_custom(self): + cluster = _make_cluster() + campaign = CampaignBuilder.from_cluster( + cluster, created_by="senior_analyst" + ) + assert campaign.created_by == "senior_analyst" + + +class TestCampaignBuilderFromClusters: + def test_batch_promotion(self): + clusters = [ + _make_cluster(cluster_id="c1", label="A"), + _make_cluster(cluster_id="c2", label="B"), + _make_cluster(cluster_id="c3", label="C"), + ] + campaigns = CampaignBuilder.from_clusters(clusters) + assert len(campaigns) == 3 + names = {c.name for c in campaigns} + assert names == {"A", "B", "C"} + + def test_min_confidence_filter(self): + clusters = [ + _make_cluster(cluster_id="c1", stix_confidence=80), + _make_cluster(cluster_id="c2", stix_confidence=30), + _make_cluster(cluster_id="c3", stix_confidence=60), + ] + campaigns = CampaignBuilder.from_clusters( + clusters, min_confidence=50 + ) + assert len(campaigns) == 2 + + def test_min_confidence_zero_includes_all(self): + clusters = [ + _make_cluster(cluster_id="c1", stix_confidence=10), + _make_cluster(cluster_id="c2", stix_confidence=0), + ] + campaigns = CampaignBuilder.from_clusters(clusters, min_confidence=0) + assert len(campaigns) == 2 + + def test_empty_list(self): + assert CampaignBuilder.from_clusters([]) == [] + + +class TestCampaignBuilderMerge: + def test_merge_adds_indicators(self): + campaign = CampaignProfile( + name="Existing", + indicator_ids=["ioc-a", "ioc-b"], + cluster_ids=["old-cluster"], + ) + cluster = _make_cluster( + cluster_id="new-cluster", + member_ids=["ioc-b", "ioc-c", "ioc-d"], + ) + CampaignBuilder.merge_into_existing(campaign, cluster) + assert set(campaign.indicator_ids) == {"ioc-a", "ioc-b", "ioc-c", "ioc-d"} + assert "new-cluster" in campaign.cluster_ids + assert "old-cluster" in campaign.cluster_ids + + def test_merge_deduplicates(self): + campaign = CampaignProfile( + name="Existing", + indicator_ids=["ioc-1", "ioc-2"], + ) + cluster = _make_cluster(member_ids=["ioc-1", "ioc-2", "ioc-3"]) + CampaignBuilder.merge_into_existing(campaign, cluster) + assert campaign.indicator_ids == ["ioc-1", "ioc-2", "ioc-3"] + + def test_merge_does_not_change_status(self): + campaign = CampaignProfile( + name="Active Campaign", + status=CampaignStatus.ACTIVE, + ) + cluster = _make_cluster() + CampaignBuilder.merge_into_existing(campaign, cluster) + assert campaign.status == CampaignStatus.ACTIVE