Conversation
Implements TAXII 2.1-based peer-to-peer and hierarchical federation
between independent GNAT instances with TLP-gated sharing and
incremental sync.
New packages:
- gnat/federation/: FederationPeer, PeerRegistry, PeerSyncService,
FederationScheduler, FederationTopology
- gnat/connectors/gnat_remote/: GNATRemoteConnector (BaseClient +
ConnectorMixin over TAXII 2.1)
REST API:
- GET/POST /api/federation/peers
- DELETE/GET/POST /api/federation/peers/{peer_id}[/health|/sync]
- GET /api/federation/topology
Also adds TAXIIPushDelivery export target, registers gnat_remote in
CLIENT_REGISTRY, wires federation into create_app(), adds 79 unit tests,
and documents federation config in config.ini.example and CHANGELOG.
https://claude.ai/code/session_01BDoue9HxB83ijLzFARAugq
There was a problem hiding this comment.
Pull request overview
This PR introduces a federation layer to synchronize STIX 2.1 content between independent GNAT instances over TAXII 2.1, including peer configuration/persistence, sync orchestration, scheduling, topology visualization, and a new remote GNAT TAXII connector plus supporting API/export/config/docs/tests.
Changes:
- Add
gnat.federation(peer model + registry, sync service, scheduler, topology helpers) with unit tests. - Add
GNATRemoteConnector(TAXII 2.1 client) and register it inCLIENT_REGISTRY. - Add FastAPI federation router endpoints, a TAXII push export delivery target, and document/configure federation.
Reviewed changes
Copilot reviewed 15 out of 16 changed files in this pull request and generated 17 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/unit/federation/test_federation.py | New unit tests covering federation peer/registry/sync/topology/scheduler behavior. |
| tests/unit/federation/init.py | New test package marker for federation tests. |
| tests/unit/connectors/test_gnat_remote.py | New unit tests for GNATRemoteConnector behavior. |
| gnat/serve/routers/federation.py | New REST API router for peer CRUD, health checks, sync triggers, and topology graph output. |
| gnat/serve/app.py | Wire federation router and federation objects into app.state / create_app() args. |
| gnat/federation/topology.py | New topology traversal + graph building + effective max TLP logic. |
| gnat/federation/sync.py | New pull/push sync orchestration with TLP gate and conflict resolution. |
| gnat/federation/scheduler.py | New scheduling bridge using existing FeedScheduler/FeedJob. |
| gnat/federation/peer.py | New peer dataclass + JSON-backed peer registry + config loader. |
| gnat/federation/init.py | Public federation package exports. |
| gnat/export/delivery/targets.py | Add TAXIIPushDelivery export delivery target for pushing bundles to remote TAXII collections. |
| gnat/connectors/gnat_remote/connector.py | New TAXII 2.1 connector for remote GNAT instances. |
| gnat/connectors/gnat_remote/init.py | Export GNATRemoteConnector. |
| gnat/clients/init.py | Register gnat_remote connector and export symbol. |
| config/config.ini.example | Add federation configuration examples (mesh + hierarchical). |
| CHANGELOG.md | Document new federation layer, connector, API, export target, config, and tests. |
Comments suppressed due to low confidence (1)
gnat/serve/app.py:370
create_app()addsfederation_registry,federation_scheduler, andfederation_sync_serviceparameters, but the docstring’s Parameters section isn’t updated to describe them. Please document what types are expected and how routers use theseapp.state.*fields.
def create_app(
api_key: str,
library_backend=None,
scheduler_backend=None,
reports_dir: str | None = None,
investigation_service=None,
graph_query=None,
gap_detector=None,
report_drafting_assistant=None,
export_service=None,
metrics_aggregator=None,
federation_registry=None,
federation_scheduler=None,
federation_sync_service=None,
) -> FastAPI:
"""
Build and return the GNAT web dashboard FastAPI application.
Parameters
----------
api_key : str
Required ``X-Api-Key`` value for all ``/api/*`` requests.
library_backend : ResearchLibrary, optional
Pre-constructed library instance. When ``None`` the library
endpoints return ``503``.
scheduler_backend : FeedScheduler, optional
Pre-constructed scheduler instance. When ``None`` the scheduler
endpoints return ``503``.
reports_dir : str, optional
Directory to scan for generated reports.
"""
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
|
||
| from __future__ import annotations | ||
|
|
||
| import json |
There was a problem hiding this comment.
Unused import json at module scope will fail Ruff (F401). Remove it or use it in the test.
| import json |
|
|
||
| from __future__ import annotations | ||
|
|
||
| import json |
There was a problem hiding this comment.
Unused import json will fail Ruff (F401). It looks like the module isn't used anywhere in this test file (the json name is only used as a parameter name in nested fakes).
| import json |
| def test_empty_push_returns_empty_dict(self): | ||
| """push_bundle with empty objects list skips POST and returns {}.""" | ||
| c = _connector() | ||
| with patch.object(c, "post") as mock_post: | ||
| result = c.push_bundle(workspace="threats-2025", objects=[]) | ||
| # push_bundle does NOT guard against empty lists — it posts anyway | ||
| # Just verify it returns a dict | ||
| assert isinstance(result, dict) | ||
|
|
||
|
|
There was a problem hiding this comment.
This test's docstring/comment is internally inconsistent and doesn't assert behavior: it says empty push "skips POST and returns {}", but the inline comment says push_bundle posts anyway, and the test doesn't check whether post() was called. Update the test to match the intended connector behavior and assert it (either post not called + {} returned, or post called with an empty bundle).
| def test_empty_push_returns_empty_dict(self): | |
| """push_bundle with empty objects list skips POST and returns {}.""" | |
| c = _connector() | |
| with patch.object(c, "post") as mock_post: | |
| result = c.push_bundle(workspace="threats-2025", objects=[]) | |
| # push_bundle does NOT guard against empty lists — it posts anyway | |
| # Just verify it returns a dict | |
| assert isinstance(result, dict) | |
| def test_empty_push_posts_empty_bundle(self): | |
| """push_bundle with empty objects list posts an empty STIX bundle.""" | |
| c = _connector() | |
| with patch.object(c, "post", return_value={}) as mock_post: | |
| result = c.push_bundle(workspace="threats-2025", objects=[]) | |
| assert result == {} | |
| mock_post.assert_called_once() | |
| args, kwargs = mock_post.call_args | |
| assert "threats-2025" in args[0] | |
| bundle = kwargs["json"] | |
| assert isinstance(bundle, dict) | |
| assert bundle["type"] == "bundle" | |
| assert bundle["objects"] == [] |
| import json | ||
| import uuid | ||
| from datetime import datetime, timezone | ||
| from typing import Any | ||
|
|
||
| from gnat.clients.base import BaseClient, GNATClientError | ||
| from gnat.connectors.base_connector import ConnectorMixin | ||
|
|
||
| _API_ROOT = "/taxii2/roots/gnat" | ||
| _TAXII_MEDIA_TYPE = "application/taxii+json;version=2.1" | ||
| _STIX_MEDIA_TYPE = "application/stix+json;version=2.1" | ||
|
|
||
|
|
||
| def _now_ts() -> str: | ||
| return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z" | ||
|
|
There was a problem hiding this comment.
Unused import json will fail Ruff (F401). This module doesn’t use the json module anywhere (passing json= kwargs doesn’t count). Remove the import (and consider removing _now_ts() as well if it remains unused).
| def list_objects( | ||
| self, | ||
| stix_type: str, | ||
| filters: dict[str, Any] | None = None, | ||
| page: int = 1, | ||
| page_size: int = 100, | ||
| ) -> list[dict[str, Any]]: | ||
| """ | ||
| List STIX objects from a workspace with optional filters. | ||
|
|
||
| Parameters | ||
| ---------- | ||
| stix_type : str | ||
| Filter by this STIX type (passed as ``match[type]``). | ||
| filters : dict, optional | ||
| Supported keys: | ||
|
|
||
| * ``workspace`` — override default workspace | ||
| * ``added_after`` — ISO 8601 timestamp for incremental pull | ||
| * ``match_id`` — exact STIX ID filter | ||
| * ``next_page`` — TAXII pagination cursor | ||
| page_size : int | ||
| Maximum objects per call (default 100). |
There was a problem hiding this comment.
list_objects() accepts page and page_size but page is unused and pagination is driven by TAXII next cursors (filters['next_page']). Either implement page→cursor translation (if supported) or remove the unused page parameter to avoid a misleading API.
| objects = result.payloads.get("objects", []) | ||
| if not objects: | ||
| # Try to collect from all payloads that are lists of dicts | ||
| for payload in result.payloads.values(): | ||
| if isinstance(payload, list): | ||
| objects.extend(payload) | ||
|
|
There was a problem hiding this comment.
objects = result.payloads.get("objects", []) can alias the list stored in result.payloads and the subsequent objects.extend(...) mutates the original payload in-place. That side effect can surprise callers (and can accumulate objects across deliveries if the same TransformResult is reused). Make a copy before extending (e.g. objects = list(result.payloads.get("objects") or [])).
| display_name=body.get("display_name", ""), | ||
| direction=body.get("direction", "pull"), | ||
| max_tlp=body.get("max_tlp", "green"), | ||
| parent_peer_id=body.get("parent_peer_id"), | ||
| sync_interval_seconds=int(body.get("sync_interval_seconds", 3600)), | ||
| workspace_filter=body.get("workspace_filter", []), | ||
| enabled=bool(body.get("enabled", True)), | ||
| ) |
There was a problem hiding this comment.
enabled=bool(body.get("enabled", True)) will treat string values like "false" as True (because non-empty strings are truthy). If clients might send strings (or null), parse explicitly (e.g. accept only booleans, or normalize common string forms) to avoid accidentally enabling peers.
| def _save(self) -> None: | ||
| self._path.parent.mkdir(parents=True, exist_ok=True) | ||
| self._path.write_text( | ||
| json.dumps( | ||
| {pid: asdict(p) for pid, p in self._peers.items()}, | ||
| indent=2, | ||
| ensure_ascii=False, | ||
| ), | ||
| encoding="utf-8", | ||
| ) |
There was a problem hiding this comment.
PeerRegistry persists api_key values in plaintext JSON (~/.gnat/federation_peers.json by default). At minimum, ensure the file is created with restrictive permissions (e.g. 0600) and consider documenting the sensitivity of this file (or supporting loading secrets from env/OS keyring).
| result: list[str] = [] | ||
| queue: list[str] = [peer_id] | ||
| visited: set[str] = {peer_id} | ||
|
|
||
| while queue: | ||
| current = queue.pop(0) | ||
| children = [ | ||
| p.peer_id | ||
| for p in self._registry.list() | ||
| if p.parent_peer_id == current and p.peer_id not in visited | ||
| ] | ||
| for child_id in children: | ||
| visited.add(child_id) | ||
| result.append(child_id) | ||
| queue.append(child_id) | ||
|
|
There was a problem hiding this comment.
descendants() repeatedly calls self._registry.list() inside the BFS loop and uses queue.pop(0) (O(n) per pop). This makes the traversal O(n^2) even for modest peer counts. Consider precomputing a parent→children adjacency map once and using collections.deque for the queue.
| max_tlp=body.get("max_tlp", "green"), | ||
| parent_peer_id=body.get("parent_peer_id"), | ||
| sync_interval_seconds=int(body.get("sync_interval_seconds", 3600)), | ||
| workspace_filter=body.get("workspace_filter", []), | ||
| enabled=bool(body.get("enabled", True)), |
There was a problem hiding this comment.
parent_peer_id=body.get("parent_peer_id") will persist empty strings as a parent id (""), which will then be treated as a real hierarchy link in FederationTopology. Normalize empty/whitespace-only values to None (as PeerRegistry.from_config() already does) before calling registry.register().
Implements TAXII 2.1-based peer-to-peer and hierarchical federation between independent GNAT instances with TLP-gated sharing and incremental sync.
New packages:
REST API:
Also adds TAXIIPushDelivery export target, registers gnat_remote in CLIENT_REGISTRY, wires federation into create_app(), adds 79 unit tests, and documents federation config in config.ini.example and CHANGELOG.
https://claude.ai/code/session_01BDoue9HxB83ijLzFARAugq