Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 148 additions & 0 deletions gnat/analysis/attribution/builder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
# SPDX-License-Identifier: Apache-2.0
# Copyright 2026 Bill Halpin
"""
gnat.analysis.attribution.builder
=====================================

Campaign builder — promotes :class:`~gnat.analysis.correlation.cluster_detector.Cluster`
objects into :class:`CampaignProfile` instances and wires up indicator,
actor, and investigation linkage.

The builder is the bridge between the automated correlation layer
(which produces clusters) and the analyst-managed campaign layer
(which tracks lifecycle, attribution hypotheses, and kill-chain
progression). It is explicitly *not* automatic: clusters must be
promoted by an analyst or by a rule-triggered workflow.
"""

from __future__ import annotations

import logging
from typing import Any

from gnat.analysis.attribution.models import CampaignProfile, CampaignStatus

logger = logging.getLogger(__name__)


class CampaignBuilder:
"""
Converts correlation clusters into campaign profiles.

Stateless — takes cluster data as dicts (matching
:meth:`Cluster.to_dict`) and returns :class:`CampaignProfile`
instances ready for persistence via :class:`CampaignService`.
"""

@staticmethod
def from_cluster(
cluster: dict[str, Any],
*,
created_by: str = "cluster_detector",
classification: str = "amber",
) -> CampaignProfile:
"""
Build a :class:`CampaignProfile` from a single cluster dict.

Uses the cluster's ``suggested_campaign`` as the campaign name
(falls back to the cluster label), and ``suggested_actor`` as
the initial ``threat_actor_id``. Member IDs become the campaign's
``indicator_ids`` and the cluster ID is linked as a
``cluster_id``.

Comment on lines +47 to +52
Copy link

Copilot AI Apr 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docstring mismatch: it says the cluster ID is linked as a cluster_id, but the model field is cluster_ids (list). Also, the module docstring mentions investigation linkage, but from_cluster() doesn’t set investigation_ids at all. Please align the documentation with the actual fields/behavior to avoid misleading API consumers.

Copilot uses AI. Check for mistakes.
Parameters
----------
cluster : dict
Output of ``Cluster.to_dict()``.
created_by : str
Principal that triggered the promotion.
classification : str
TLP classification for the new campaign.
"""
name = (
cluster.get("suggested_campaign")
or cluster.get("label")
or f"Cluster {cluster.get('id', 'unknown')}"
)

actor_id = None
actor_label = cluster.get("suggested_actor")
if actor_label:
actor_id = f"threat-actor--{actor_label}"

Comment on lines +68 to +72
Copy link

Copilot AI Apr 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggested_actor from ClusterDetector appears to be a label/tag (not a STIX Threat Actor ID), but here it’s converted into threat_actor_id as threat-actor--{label}. That string is not a valid STIX ID (STIX IDs require a UUID after --) and is inconsistent with ActorProfile.id generation (threat-actor--{uuid}). Consider either (a) leaving threat_actor_id unset and storing the label elsewhere, or (b) deriving a deterministic STIX ID (e.g., uuid5) from the label so it remains stable and STIX-valid.

Copilot uses AI. Check for mistakes.
campaign = CampaignProfile(
name=name,
description=f"Auto-promoted from cluster {cluster.get('id', '')}. "
f"Signals: {'; '.join(cluster.get('signals') or [])}",
status=CampaignStatus.SUSPECTED,
threat_actor_id=actor_id,
indicator_ids=list(cluster.get("member_ids") or []),
cluster_ids=[cluster.get("id", "")],
tags=["auto-promoted", "from-cluster"],
Comment on lines +75 to +81
Copy link

Copilot AI Apr 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cluster_ids is initialized with [cluster.get('id', '')], which will store an empty-string cluster id if the input dict is missing/empty. That can silently pollute persisted campaigns and makes downstream filtering/linking ambiguous. Prefer only appending/linking the cluster id when it’s a non-empty string, and use a clearer fallback in the description if the id is absent.

Copilot uses AI. Check for mistakes.
classification=classification,
created_by=created_by,
)

logger.info(
"CampaignBuilder: promoted cluster %s → campaign %s (%s)",
cluster.get("id"),
campaign.id,
name,
)
return campaign

@staticmethod
def from_clusters(
clusters: list[dict[str, Any]],
*,
created_by: str = "cluster_detector",
min_confidence: int = 0,
) -> list[CampaignProfile]:
"""
Batch-promote multiple clusters, optionally filtering by
minimum STIX confidence.

Parameters
----------
clusters : list of dict
Each dict is the output of ``Cluster.to_dict()``.
min_confidence : int
Skip clusters whose ``confidence.stix_confidence`` is below
this threshold. Default 0 (promote all).
"""
campaigns: list[CampaignProfile] = []
for cluster in clusters:
conf = cluster.get("confidence") or {}
stix_conf = conf.get("stix_confidence", 0) if isinstance(conf, dict) else 0
if stix_conf < min_confidence:
logger.debug(
"CampaignBuilder: skipping cluster %s (confidence %d < %d)",
cluster.get("id"),
stix_conf,
min_confidence,
)
continue
campaigns.append(
CampaignBuilder.from_cluster(cluster, created_by=created_by)
)
Comment on lines +95 to +127
Copy link

Copilot AI Apr 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from_cluster() supports a classification override, but from_clusters() doesn’t expose or forward it, so batch promotion always uses the default. To keep the API consistent (and avoid surprising callers), consider adding a classification kwarg to from_clusters() and passing it through to from_cluster() (or drop the parameter from from_cluster() if it’s intentionally single-cluster only).

Copilot uses AI. Check for mistakes.
return campaigns

@staticmethod
def merge_into_existing(
campaign: CampaignProfile,
cluster: dict[str, Any],
) -> CampaignProfile:
"""
Merge a cluster's indicators into an existing campaign.

Adds the cluster's ``member_ids`` to the campaign's
``indicator_ids`` (deduplicated) and links the cluster ID.
Does NOT change campaign status or attribution.
"""
for mid in cluster.get("member_ids") or []:
if mid not in campaign.indicator_ids:
campaign.indicator_ids.append(mid)
cluster_id = cluster.get("id", "")
if cluster_id and cluster_id not in campaign.cluster_ids:
campaign.cluster_ids.append(cluster_id)
return campaign
161 changes: 161 additions & 0 deletions tests/unit/analysis/test_campaign_builder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
# SPDX-License-Identifier: Apache-2.0
# Copyright 2026 Bill Halpin
"""
tests/unit/analysis/test_campaign_builder.py
===============================================

Unit tests for the CampaignBuilder — cluster-to-campaign promotion
(Phase 4 of the attribution & campaign tracking extension).
"""

from __future__ import annotations

from gnat.analysis.attribution.builder import CampaignBuilder
from gnat.analysis.attribution.models import CampaignProfile, CampaignStatus


def _make_cluster(
cluster_id: str = "cluster-1",
label: str = "Subnet overlap cluster",
member_ids: list[str] | None = None,
signals: list[str] | None = None,
suggested_campaign: str | None = None,
suggested_actor: str | None = None,
stix_confidence: int = 60,
) -> dict:
return {
"id": cluster_id,
"label": label,
"member_ids": member_ids or ["ioc-1", "ioc-2", "ioc-3"],
"signals": signals or ["subnet_overlap", "timing_correlation"],
"confidence": {
Comment on lines +29 to +31
Copy link

Copilot AI Apr 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In _make_cluster(), using member_ids or [...] (and similarly signals or [...]) means callers can’t create an intentionally empty list (e.g., member_ids=[] will be replaced by the default). That makes it harder to test edge cases like clusters with zero members/signals. Prefer checking is None instead of truthiness when applying defaults.

Copilot uses AI. Check for mistakes.
"source_reliability": "C",
"information_credibility": 3,
"stix_confidence": stix_confidence,
"band": "MEDIUM",
"label": "C3 (MEDIUM)",
"rationale": None,
},
"suggested_actor": suggested_actor,
"suggested_campaign": suggested_campaign,
"size": 3,
}


class TestCampaignBuilderFromCluster:
def test_basic_promotion(self):
cluster = _make_cluster()
campaign = CampaignBuilder.from_cluster(cluster)
assert isinstance(campaign, CampaignProfile)
assert campaign.status == CampaignStatus.SUSPECTED
assert campaign.indicator_ids == ["ioc-1", "ioc-2", "ioc-3"]
assert "cluster-1" in campaign.cluster_ids
assert "auto-promoted" in campaign.tags

def test_uses_suggested_campaign_as_name(self):
cluster = _make_cluster(suggested_campaign="Operation Sunrise")
campaign = CampaignBuilder.from_cluster(cluster)
assert campaign.name == "Operation Sunrise"

def test_falls_back_to_label(self):
cluster = _make_cluster(label="Infrastructure overlap")
campaign = CampaignBuilder.from_cluster(cluster)
assert campaign.name == "Infrastructure overlap"

def test_links_suggested_actor(self):
cluster = _make_cluster(suggested_actor="APT28")
campaign = CampaignBuilder.from_cluster(cluster)
assert campaign.threat_actor_id == "threat-actor--APT28"

def test_no_actor_when_not_suggested(self):
cluster = _make_cluster(suggested_actor=None)
campaign = CampaignBuilder.from_cluster(cluster)
assert campaign.threat_actor_id is None

def test_description_includes_signals(self):
cluster = _make_cluster(signals=["subnet_overlap", "tag_match"])
campaign = CampaignBuilder.from_cluster(cluster)
assert "subnet_overlap" in campaign.description
assert "tag_match" in campaign.description

def test_created_by_default(self):
cluster = _make_cluster()
campaign = CampaignBuilder.from_cluster(cluster)
assert campaign.created_by == "cluster_detector"

def test_created_by_custom(self):
cluster = _make_cluster()
campaign = CampaignBuilder.from_cluster(
cluster, created_by="senior_analyst"
)
assert campaign.created_by == "senior_analyst"


class TestCampaignBuilderFromClusters:
def test_batch_promotion(self):
clusters = [
_make_cluster(cluster_id="c1", label="A"),
_make_cluster(cluster_id="c2", label="B"),
_make_cluster(cluster_id="c3", label="C"),
]
campaigns = CampaignBuilder.from_clusters(clusters)
assert len(campaigns) == 3
names = {c.name for c in campaigns}
assert names == {"A", "B", "C"}

def test_min_confidence_filter(self):
clusters = [
_make_cluster(cluster_id="c1", stix_confidence=80),
_make_cluster(cluster_id="c2", stix_confidence=30),
_make_cluster(cluster_id="c3", stix_confidence=60),
]
campaigns = CampaignBuilder.from_clusters(
clusters, min_confidence=50
)
assert len(campaigns) == 2

def test_min_confidence_zero_includes_all(self):
clusters = [
_make_cluster(cluster_id="c1", stix_confidence=10),
_make_cluster(cluster_id="c2", stix_confidence=0),
]
campaigns = CampaignBuilder.from_clusters(clusters, min_confidence=0)
assert len(campaigns) == 2

def test_empty_list(self):
assert CampaignBuilder.from_clusters([]) == []


class TestCampaignBuilderMerge:
def test_merge_adds_indicators(self):
campaign = CampaignProfile(
name="Existing",
indicator_ids=["ioc-a", "ioc-b"],
cluster_ids=["old-cluster"],
)
cluster = _make_cluster(
cluster_id="new-cluster",
member_ids=["ioc-b", "ioc-c", "ioc-d"],
)
CampaignBuilder.merge_into_existing(campaign, cluster)
assert set(campaign.indicator_ids) == {"ioc-a", "ioc-b", "ioc-c", "ioc-d"}
assert "new-cluster" in campaign.cluster_ids
assert "old-cluster" in campaign.cluster_ids

def test_merge_deduplicates(self):
campaign = CampaignProfile(
name="Existing",
indicator_ids=["ioc-1", "ioc-2"],
)
cluster = _make_cluster(member_ids=["ioc-1", "ioc-2", "ioc-3"])
CampaignBuilder.merge_into_existing(campaign, cluster)
assert campaign.indicator_ids == ["ioc-1", "ioc-2", "ioc-3"]

def test_merge_does_not_change_status(self):
campaign = CampaignProfile(
name="Active Campaign",
status=CampaignStatus.ACTIVE,
)
cluster = _make_cluster()
CampaignBuilder.merge_into_existing(campaign, cluster)
assert campaign.status == CampaignStatus.ACTIVE
Loading