Add CampaignBuilder for cluster-to-campaign promotion (Phase 4)#121
Add CampaignBuilder for cluster-to-campaign promotion (Phase 4)#121
Conversation
Fourth phase of the attribution & campaign tracking core extension.
Adds the bridge between the automated correlation layer (which
produces clusters) and the analyst-managed campaign layer.
New module: gnat/analysis/attribution/builder.py
CampaignBuilder — stateless converter from Cluster.to_dict()
output to CampaignProfile instances.
- from_cluster(): promotes a single cluster, using
suggested_campaign as name (falls back to label), linking
member_ids as indicators, suggested_actor as threat_actor_id.
- from_clusters(): batch promotion with optional min_confidence
filter to skip low-quality clusters.
- merge_into_existing(): adds a cluster's indicators to an
existing campaign (deduplicated) without changing status or
attribution — for when a new cluster matches an already-tracked
campaign.
Tests: tests/unit/analysis/test_campaign_builder.py (15 tests)
- TestCampaignBuilderFromCluster (8): basic, name from suggestion,
name from label, actor linkage, no actor, signals in description,
default/custom created_by.
- TestCampaignBuilderFromClusters (4): batch, min_confidence
filter, zero threshold, empty list.
- TestCampaignBuilderMerge (3): adds indicators, deduplicates,
preserves status.
Full unit suite: 4,823 passed, 230 skipped, 0 failed (+15 new).
Ruff clean.
There was a problem hiding this comment.
Pull request overview
Adds a new attribution-layer bridge that promotes correlation clusters (from Cluster.to_dict()) into analyst-facing CampaignProfile objects, with unit tests covering promotion, batch promotion, and merging behavior.
Changes:
- Introduce
CampaignBuilderwithfrom_cluster(),from_clusters(), andmerge_into_existing()helpers. - Add unit test suite for campaign promotion and merge semantics.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
gnat/analysis/attribution/builder.py |
New builder utility to convert cluster dicts into CampaignProfile and merge cluster indicators into existing campaigns. |
tests/unit/analysis/test_campaign_builder.py |
New unit tests for single/batch promotion and merge behavior. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| def from_clusters( | ||
| clusters: list[dict[str, Any]], | ||
| *, | ||
| created_by: str = "cluster_detector", | ||
| min_confidence: int = 0, | ||
| ) -> list[CampaignProfile]: | ||
| """ | ||
| Batch-promote multiple clusters, optionally filtering by | ||
| minimum STIX confidence. | ||
|
|
||
| Parameters | ||
| ---------- | ||
| clusters : list of dict | ||
| Each dict is the output of ``Cluster.to_dict()``. | ||
| min_confidence : int | ||
| Skip clusters whose ``confidence.stix_confidence`` is below | ||
| this threshold. Default 0 (promote all). | ||
| """ | ||
| campaigns: list[CampaignProfile] = [] | ||
| for cluster in clusters: | ||
| conf = cluster.get("confidence") or {} | ||
| stix_conf = conf.get("stix_confidence", 0) if isinstance(conf, dict) else 0 | ||
| if stix_conf < min_confidence: | ||
| logger.debug( | ||
| "CampaignBuilder: skipping cluster %s (confidence %d < %d)", | ||
| cluster.get("id"), | ||
| stix_conf, | ||
| min_confidence, | ||
| ) | ||
| continue | ||
| campaigns.append( | ||
| CampaignBuilder.from_cluster(cluster, created_by=created_by) | ||
| ) |
There was a problem hiding this comment.
from_cluster() supports a classification override, but from_clusters() doesn’t expose or forward it, so batch promotion always uses the default. To keep the API consistent (and avoid surprising callers), consider adding a classification kwarg to from_clusters() and passing it through to from_cluster() (or drop the parameter from from_cluster() if it’s intentionally single-cluster only).
| "member_ids": member_ids or ["ioc-1", "ioc-2", "ioc-3"], | ||
| "signals": signals or ["subnet_overlap", "timing_correlation"], | ||
| "confidence": { |
There was a problem hiding this comment.
In _make_cluster(), using member_ids or [...] (and similarly signals or [...]) means callers can’t create an intentionally empty list (e.g., member_ids=[] will be replaced by the default). That makes it harder to test edge cases like clusters with zero members/signals. Prefer checking is None instead of truthiness when applying defaults.
| actor_id = None | ||
| actor_label = cluster.get("suggested_actor") | ||
| if actor_label: | ||
| actor_id = f"threat-actor--{actor_label}" | ||
|
|
There was a problem hiding this comment.
suggested_actor from ClusterDetector appears to be a label/tag (not a STIX Threat Actor ID), but here it’s converted into threat_actor_id as threat-actor--{label}. That string is not a valid STIX ID (STIX IDs require a UUID after --) and is inconsistent with ActorProfile.id generation (threat-actor--{uuid}). Consider either (a) leaving threat_actor_id unset and storing the label elsewhere, or (b) deriving a deterministic STIX ID (e.g., uuid5) from the label so it remains stable and STIX-valid.
| description=f"Auto-promoted from cluster {cluster.get('id', '')}. " | ||
| f"Signals: {'; '.join(cluster.get('signals') or [])}", | ||
| status=CampaignStatus.SUSPECTED, | ||
| threat_actor_id=actor_id, | ||
| indicator_ids=list(cluster.get("member_ids") or []), | ||
| cluster_ids=[cluster.get("id", "")], | ||
| tags=["auto-promoted", "from-cluster"], |
There was a problem hiding this comment.
cluster_ids is initialized with [cluster.get('id', '')], which will store an empty-string cluster id if the input dict is missing/empty. That can silently pollute persisted campaigns and makes downstream filtering/linking ambiguous. Prefer only appending/linking the cluster id when it’s a non-empty string, and use a clearer fallback in the description if the id is absent.
| Uses the cluster's ``suggested_campaign`` as the campaign name | ||
| (falls back to the cluster label), and ``suggested_actor`` as | ||
| the initial ``threat_actor_id``. Member IDs become the campaign's | ||
| ``indicator_ids`` and the cluster ID is linked as a | ||
| ``cluster_id``. | ||
|
|
There was a problem hiding this comment.
Docstring mismatch: it says the cluster ID is linked as a cluster_id, but the model field is cluster_ids (list). Also, the module docstring mentions investigation linkage, but from_cluster() doesn’t set investigation_ids at all. Please align the documentation with the actual fields/behavior to avoid misleading API consumers.
Fourth phase of the attribution & campaign tracking core extension. Adds the bridge between the automated correlation layer (which produces clusters) and the analyst-managed campaign layer.
New module: gnat/analysis/attribution/builder.py
CampaignBuilder — stateless converter from Cluster.to_dict()
output to CampaignProfile instances.
Tests: tests/unit/analysis/test_campaign_builder.py (15 tests)
Full unit suite: 4,823 passed, 230 skipped, 0 failed (+15 new). Ruff clean.