Conversation
Critical:
- sync.py: replace manual STIXBase dict manipulation with
workspace._add_object(obj, source_platform="peer:<id>") so
object provenance is properly stored in the source_platform DB column
Quality:
- sync.py: replace local _TLP_RANKS dict with TLPLevel enum from
gnat.analysis.tlp to eliminate duplication
- scheduler.py: store PullResult on _FederationReader._last_result so
_on_success callback reports actual objects_accepted instead of 0
Integration:
- export/delivery/__init__.py: export TAXIIPushDelivery publicly
- gnat/__init__.py: export all federation classes from top-level API
- cli/main.py (_cmd_serve): initialise FederationScheduler+PeerRegistry
from config when gnat serve is invoked with a config file
Feature:
- cli/main.py: add gnat federation subcommand with:
federation list [--enabled-only]
federation register PEER_ID --taxii-url --api-key --workspaces
federation delete PEER_ID
federation health PEER_ID
federation sync PEER_ID [--dry-run]
federation topology
https://claude.ai/code/session_01BDoue9HxB83ijLzFARAugq
- gnat/agents/gemini.py: new GeminiProvider(LLMProvider, BaseClient) supporting Gemini 2.0 Flash and 1.5 Pro; chat() and structured() methods; OpenAI-compatible response envelope; x-goog-api-key auth; systemInstruction mapping - gnat/agents/llm.py: wire Gemini backend (was NotImplementedError); update supported backends list in error message - gnat/agents/claude.py: update default model to claude-sonnet-4-6 - gnat/research/library.py: Solr search integration — search_index param in __init__; search() dispatches to _solr_search() or _memory_search(); new _entry_by_stix_id(), _index_entry_objects(); promote() indexes on write; default()/from_manager() auto-configure index from [search] INI section - gnat/connectors/recordedfuture/rfv3.py: harden list_alerts/list_playbook_alerts with dual key paths (results/alerts, nextPageToken/pagination.nextPageToken); update_playbook_alert PATCH→PUT fallback; defensive key paths throughout https://claude.ai/code/session_01BDoue9HxB83ijLzFARAugq
There was a problem hiding this comment.
Pull request overview
This PR significantly expands GNAT’s integrations across LLM backends, federation, search, and connector robustness—despite the PR title suggesting a narrower Claude-docs scope.
Changes:
- Add a new Gemini LLM provider and enable
backend="gemini"inLLMClient; update Claude default model string. - Add/extend federation CLI + server initialization and refine federation scheduler logging/reader behavior.
- Add Solr-optional search/index integration to
ResearchLibraryand harden Recorded Future v3 connector pagination/envelope handling.
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 11 comments.
Show a summary per file
| File | Description |
|---|---|
| gnat/research/library.py | Adds optional search index wiring, Solr dispatch path, and indexing on promote |
| gnat/federation/sync.py | Replaces local TLP rank map with TLPLevel ranking and changes ingest write path |
| gnat/federation/scheduler.py | Captures pull result for improved success logging without record pipeline writes |
| gnat/export/delivery/init.py | Exposes delivery target classes via package exports and __all__ |
| gnat/connectors/recordedfuture/rfv3.py | Adds fallback parsing for multiple envelope shapes and PATCH→PUT fallback logic |
| gnat/cli/main.py | Adds gnat federation CLI and initializes federation components during gnat serve |
| gnat/agents/llm.py | Enables Gemini backend selection in unified LLM client |
| gnat/agents/gemini.py | Implements GeminiProvider via BaseClient without external SDK dependency |
| gnat/agents/claude.py | Updates default Claude model string |
| gnat/init.py | Re-exports federation public API from top-level package |
| CHANGELOG.md | Documents new Gemini, Solr integration, RF hardening, and federation capabilities |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| def _index_entry_objects(self, entry: ResearchEntry) -> None: | ||
| """Index all STIX objects from a ResearchEntry into the search sidecar.""" | ||
| for obj in entry.stix_objects: | ||
| try: | ||
| self._search_index.index( | ||
| obj, | ||
| source_platform="research_library", | ||
| extra_fields={"research_topic": entry.topic}, | ||
| ) |
There was a problem hiding this comment.
_index_entry_objects() passes raw STIX dicts (entry.stix_objects) into SearchIndex.index(), but the SearchIndex interface is typed/implemented for STIXBase instances (e.g. SolrSearchIndex._to_doc() calls obj.to_dict(), obj.id, etc.). With a real SolrSearchIndex this will always raise and indexing will silently fail (caught and logged).
Convert dicts to ORM objects before indexing (e.g. via the same dict→STIXBase reconstruction used elsewhere, or update the SearchIndex contract/impl to accept dicts).
| stix_ids = self._search_index.search(query, limit=limit * 2) | ||
|
|
||
| entries: list[ResearchEntry] = [] | ||
| for stix_id in stix_ids: | ||
| entry = self._entry_by_stix_id(stix_id, include_staging) | ||
| if entry is None: | ||
| continue | ||
| if not include_stale and not entry.is_fresh: | ||
| continue | ||
| entries.append(entry) | ||
| if len(entries) >= limit: | ||
| break | ||
| return entries | ||
|
|
||
| def _entry_by_stix_id( | ||
| self, | ||
| stix_id: str, | ||
| include_staging: bool, | ||
| ) -> ResearchEntry | None: | ||
| """Return the ResearchEntry containing the given STIX object ID, or None.""" | ||
| for entry in self._load_all_entries(self._library_name, status="curated"): | ||
| for obj in entry.stix_objects: | ||
| if isinstance(obj, dict) and obj.get("id") == stix_id: | ||
| return entry | ||
| if include_staging: | ||
| for entry in self._load_all_entries(self._staging_name, status="pending"): | ||
| for obj in entry.stix_objects: | ||
| if isinstance(obj, dict) and obj.get("id") == stix_id: | ||
| return entry | ||
| return None |
There was a problem hiding this comment.
_solr_search() calls _entry_by_stix_id() once per returned STIX ID, and _entry_by_stix_id() reloads all entries from the workspace store each time. This makes Solr-backed search O(results × entries) and can become very slow as the library grows.
Consider loading entries once per search call and building an id→entry map (or caching a reverse index in workspace metadata) so each ID lookup is O(1).
| logger = logging.getLogger(__name__) | ||
|
|
||
| # TLP rank map — must match gnat.analysis.tlp._RANKS | ||
| _TLP_RANKS: dict[str, int] = { | ||
| "white": 0, | ||
| "clear": 0, | ||
| "green": 1, | ||
| "amber": 2, | ||
| "amber+strict": 3, | ||
| "red": 4, | ||
| } | ||
| from gnat.analysis.tlp import TLPLevel | ||
|
|
There was a problem hiding this comment.
from gnat.analysis.tlp import TLPLevel is placed after logger = ... rather than grouped with the other imports at the top of the module. This breaks standard import ordering (and will typically fail isort/ruff I001).
Move the import into the main import block near the other from ... import ... statements.
| from gnat.federation.sync import PeerSyncService, FederationError | ||
| svc = PeerSyncService() | ||
| dry_run = getattr(args, "dry_run", False) | ||
| if dry_run: | ||
| print(_yellow(f"Dry-run: fetching from peer {_bold(args.peer_id)} …")) | ||
| else: | ||
| print(f"Syncing from peer {_bold(args.peer_id)} …") | ||
| try: | ||
| result = svc.sync_from_peer(peer=peer, dry_run=dry_run) | ||
| registry.update_sync_status(args.peer_id, "success") |
There was a problem hiding this comment.
PeerSyncService() is instantiated without a workspace_manager. In PeerSyncService.sync_from_peer(), objects are only persisted when self._wm is not None, so gnat federation sync (non-dry-run) will report objects accepted but not actually write them to local workspaces.
Initialize PeerSyncService with a real WorkspaceManager (e.g. WorkspaceManager.default(config_path=...)) or have PeerSyncService default to one when none is provided.
|
|
||
| _cfg = GNATConfig(config_path) | ||
| _registry = PeerRegistry.from_config(_cfg) | ||
| _sync_svc = PeerSyncService() |
There was a problem hiding this comment.
Same issue as the CLI sync path: _sync_svc = PeerSyncService() is created without a workspace_manager, so scheduled federation pulls started by gnat serve will not persist any accepted objects to local workspaces.
Pass a configured WorkspaceManager into PeerSyncService (or refactor PeerSyncService to create a default manager when appropriate).
| _cfg = GNATConfig(config_path) | |
| _registry = PeerRegistry.from_config(_cfg) | |
| _sync_svc = PeerSyncService() | |
| from gnat.workspace.manager import WorkspaceManager | |
| _cfg = GNATConfig(config_path) | |
| _registry = PeerRegistry.from_config(_cfg) | |
| try: | |
| _workspace_manager = WorkspaceManager.from_config(_cfg) | |
| except AttributeError: | |
| try: | |
| _workspace_manager = WorkspaceManager(config_path=config_path) | |
| except TypeError: | |
| _workspace_manager = WorkspaceManager() | |
| _sync_svc = PeerSyncService(workspace_manager=_workspace_manager) |
| data = resp.get("data", {}) | ||
| page = data.get("results") or data.get("alerts", []) | ||
| results.extend(page) |
There was a problem hiding this comment.
Same envelope-selection issue in list_playbook_alerts(): page = data.get("results") or data.get("alerts", []) will fall back to alerts when results is present but empty.
Use key-presence checks instead of truthiness so empty lists are handled correctly.
| url = f"{self._PLAYBOOK_BASE}/{alert_id}" | ||
| try: | ||
| resp = self.patch(url, json=payload) | ||
| except Exception: # noqa: BLE001 |
There was a problem hiding this comment.
update_playbook_alert() docstring says the fallback to PUT is specifically for 405 Method Not Allowed, but the code falls back on any exception (including timeouts, auth failures, 4xx/5xx, etc.). This can mask real errors and potentially perform an unintended PUT.
Catch GNATClientError and only fall back when exc.status == 405; otherwise re-raise.
| except Exception: # noqa: BLE001 | |
| except GNATClientError as exc: | |
| if exc.status != 405: | |
| raise |
| Returns raw bytes — callers are responsible for parsing | ||
| (CSV, JSON, STIX bundle, etc.). Handles both raw-bytes responses | ||
| and JSON-envelope responses where the content is base64 or embedded. | ||
| """ | ||
| resp = self.get(self._FUSION_BASE, params={"path": file_path}) | ||
| if isinstance(resp, bytes): | ||
| return resp | ||
| if isinstance(resp, dict): | ||
| return resp.get("data", b"") | ||
| data = resp.get("data", {}) | ||
| if isinstance(data, bytes): | ||
| return data | ||
| if isinstance(data, dict): | ||
| # Some RF endpoints embed content as a string field | ||
| content = data.get("content") or data.get("body", "") | ||
| return content.encode() if isinstance(content, str) else b"" | ||
| return b"" |
There was a problem hiding this comment.
get_fusion_file() docstring claims it handles JSON-envelope responses where the content is "base64", but the implementation only .encode()s a returned string and never base64-decodes it. This makes the docstring inaccurate (and callers may get encoded base64 text rather than file bytes).
Either implement base64 decoding when appropriate (e.g. for a contentBase64/content field) or adjust the docstring to match the actual behavior.
| def __init__( | ||
| self, | ||
| manager: WorkspaceManager, | ||
| ttls: dict[str, int] | None = None, | ||
| staging_name: str = _STAGING_NAME, | ||
| library_name: str = _LIBRARY_NAME, | ||
| search_index: Any | None = None, | ||
| ): | ||
| """Initialize ResearchLibrary.""" | ||
| self._manager = manager | ||
| self._ttls = {**DEFAULT_TTLS, **(ttls or {})} | ||
| self._staging_name = staging_name | ||
| self._library_name = library_name | ||
| self._ensure_workspaces() | ||
| if search_index is not None: | ||
| self._search_index = search_index | ||
| else: | ||
| from gnat.search.index import NullSearchIndex | ||
| self._search_index = NullSearchIndex() | ||
|
|
There was a problem hiding this comment.
ResearchLibrary.__init__ (and _build_search_index_from_config) use Any for search_index, but the codebase already defines a SearchIndex ABC (gnat.search.index.SearchIndex). Tightening the type to SearchIndex | None will catch integration issues (like passing dicts to .index()) earlier and improves editor/lint support.
| ### Added — AI & Connector Improvements | ||
|
|
||
| **Google Gemini provider (`gnat/agents/gemini.py`)** | ||
| - `GeminiProvider(LLMProvider, BaseClient)`: full Gemini 2.0/1.5 support via `POST /v1beta/models/{model}:generateContent`; auth via `x-goog-api-key` header; system messages mapped to `systemInstruction`; "assistant" → "model" role translation; `chat()` returns OpenAI-compatible `choices[0].message.content` envelope; `structured()` uses `response_mime_type: application/json` for reliable JSON output; default model `gemini-2.0-flash` | ||
| - `LLMClient` now accepts `backend="gemini"` — previously raised `NotImplementedError`; error message updated to list `gemini` as supported | ||
| - `ClaudeProvider` default model updated from `claude-3-5-sonnet-20241022` to `claude-sonnet-4-6` |
There was a problem hiding this comment.
PR title suggests this change is about Claude documentation, but this diff adds substantial new functionality (Gemini provider, federation CLI/scheduler wiring, Solr-backed ResearchLibrary search, RF connector changes). Consider updating the PR title/description (or splitting into smaller PRs) so reviewers and release notes accurately reflect the scope.
No description provided.