feat: implement 8 roadmap feature areas (autonomous pipelines, AI/LLM…#101
feat: implement 8 roadmap feature areas (autonomous pipelines, AI/LLM…#101
Conversation
…, viz, connectors, reports, analytics, workflows, analyst UX)
Area 1 — Autonomous Investigation Pipelines:
- workflow.py: add RetryPolicy, branch_on, parallel_steps, _run_step_with_retry, _run_parallel, _resolve_next
- workflow_store.py: persist WorkflowResult to workflow_runs table (SQLAlchemy Core, in-memory fallback)
- 0009_workflow_runs.py: Alembic migration for workflow_runs table
- triggers.py: WorkflowTrigger base + AlertTrigger, ScheduledTrigger, WebhookTrigger
- workflows/auto_investigation.py: autonomous alert → enrich → score → route → open/review pipeline
- agents/catalog.py: WorkflowCatalog registry with phishing-triage, incident-response, auto-investigation
Area 2 — Deeper AI/LLM Integration:
- llm.py: stream(), tool_call(), embed(), _fallback_chain() with graceful degradation
- claude.py: SSE streaming, tool_call, cache_control ephemeral headers, inject llm_client
- openai_compatible.py: stream(), tool_call(), embed() via /v1/embeddings
- research.py, parsing.py: accept injected LLMClient via llm_client= param
- agents/embeddings.py: EmbeddingStore with cosine similarity (pure Python)
- search/semantic.py: SemanticSearchIndex implementing SearchIndex ABC
Area 3 — Enhanced Visualization:
- viz/graph.py: VisualizationTheme dataclass, LayoutRegistry (barnes-hut/spring/circular),
render_attack_matrix(), render_causal_timeline(), render_temporal_playback() via monkey-patch
- viz/geo.py: GeoView.render_threat_heatmap() Plotly choropleth by ISO-3 country
Area 4 — Expanded Connector Ecosystem:
- connectors/health.py: ConnectorHealth dataclass, FleetHealthMonitor.check_all() + check_one()
- clients/base.py: ConnectorHealthResult dataclass, health_check_detailed() method
- codegen/bulk_tester.py: BulkTestResult, ConnectorTestResult, run_bulk_tests() with JUnit XML parsing
- cli/main.py: gnat health fleet subcommand (parallel registry health check with --json flag)
Area 5 — Report Building Enhancements:
- reports/templates.py: SectionSpec, ReportTemplate, TemplateRegistry with daily/weekly/incident/executive built-ins
- reports/cache.py: ReportCache with MD5 content-hash keying and TTL expiry
- reports/custom_section.py: CustomSection with filter queries, aggregator methods, AI synthesis
Area 6 — Advanced Visualization & Analytics:
- search/index.py: SolrSearchIndex.facet() and .histogram() methods
- analysis/trends.py: TrendDetector with sliding-window spike detection, TrendReport
- analysis/stats.py: WorkspaceStats (type_counts, platform_counts, source_reliability_matrix, confidence_distribution, attack_coverage_report)
- serve/routers/analytics.py: /api/analytics/trends, /facets, /histogram, /summary, /attack-coverage
Area 7 — Workflow Orchestration:
- schedule/job.py: WorkflowJob(FeedJob) for scheduling arbitrary workflows
- serve/routers/workflows.py: GET /api/workflows, POST /api/workflows/{name}/run, GET /api/workflows/runs
- tui/screens/workflows.py: WorkflowsScreen with catalog list + run history + trigger button
Area 8 — Analyst UX Improvements:
- tui/widgets/alerts_panel.py: AlertsPanel polls ReviewService + GapDetector every 30s
- tui/widgets/copilot_pane.py: CopilotPane with streaming LLM responses via Ctrl+K toggle
- serve/app.py: GET /api/stream SSE endpoint (review_pending, job_running events)
- serve/routers/investigations.py: POST /bulk-transition, GET /export (CSV/JSON/STIX)
- tui/screens/investigations.py: Ctrl+E export to CSV, _investigations instance cache
All 3955 unit tests pass.
https://claude.ai/code/session_01BDoue9HxB83ijLzFARAugq
There was a problem hiding this comment.
Pull request overview
Implements a broad set of “roadmap feature areas” across workflow orchestration, AI/LLM capabilities, analytics, visualization, connectors health tooling, reports, and both TUI + HTTP APIs.
Changes:
- Added a workflow engine with retry/backoff, branching, parallel fan-out, persistence, triggers, catalog registration, and UI/API surfaces to run and inspect workflow history.
- Added deeper LLM integration (streaming/tool-calls/embeddings + fallback chain) and semantic search via an in-memory embedding store.
- Added new analytics endpoints (trends/facets/histograms/summary/ATT&CK coverage) plus additional visualization and reporting building blocks.
Reviewed changes
Copilot reviewed 36 out of 36 changed files in this pull request and generated 14 comments.
Show a summary per file
| File | Description |
|---|---|
| gnat/viz/graph.py | Adds theming, layout registry, and additional GraphView renderers (HTML outputs). |
| gnat/viz/geo.py | New country-level threat heatmap renderer using Plotly choropleth. |
| gnat/tui/widgets/copilot_pane.py | New collapsible copilot pane with LLM streaming into a log view. |
| gnat/tui/widgets/alerts_panel.py | New top-of-screen alerts panel polling review/gap services. |
| gnat/tui/screens/workflows.py | New workflows screen listing catalog + run history and trigger action. |
| gnat/tui/screens/investigations.py | Adds investigations caching and CSV export action/binding. |
| gnat/serve/routers/workflows.py | New workflow catalog + run endpoints and run-history APIs. |
| gnat/serve/routers/investigations.py | Adds bulk transition endpoint and export (CSV/JSON/STIX). |
| gnat/serve/routers/analytics.py | New analytics router for trends/facets/histograms/summary/coverage. |
| gnat/serve/app.py | Wires new routers, adds SSE stream endpoint, and sets new app.state deps. |
| gnat/search/semantic.py | New semantic SearchIndex backed by EmbeddingStore dense retrieval. |
| gnat/search/index.py | Adds Solr facet and histogram helpers used by analytics. |
| gnat/search/init.py | Exposes SemanticSearchIndex from package init. |
| gnat/schedule/job.py | Adds WorkflowJob to schedule arbitrary workflows. |
| gnat/reports/templates.py | Adds template-driven report section specs + registry. |
| gnat/reports/custom_section.py | Adds analyst-defined custom report sections with filtering + synthesis. |
| gnat/reports/cache.py | Adds content-hash report cache with TTL and on-disk index. |
| gnat/connectors/health.py | Adds connector fleet health monitoring with parallel checks. |
| gnat/codegen/bulk_tester.py | Adds pytest bulk runner for connector tests with JUnit XML parsing. |
| gnat/clients/base.py | Adds detailed connector health result and BaseClient helper method. |
| gnat/cli/main.py | Adds gnat health fleet CLI command (parallel connector health checks). |
| gnat/analysis/trends.py | Adds TrendDetector + TrendReport for spike detection via Solr histograms. |
| gnat/analysis/stats.py | Adds WorkspaceStats for facets/confidence distribution/ATT&CK coverage. |
| gnat/agents/workflows/auto_investigation.py | Adds autonomous alert→enrich→score→route→open/review workflow. |
| gnat/agents/workflow.py | Adds retry policy, branching, and parallel fan-out to workflow engine. |
| gnat/agents/workflow_store.py | Adds WorkflowStore persistence layer for workflow run records. |
| gnat/agents/triggers.py | Adds alert/scheduled/webhook triggers to execute workflows. |
| gnat/agents/research.py | Allows injecting unified LLMClient for research agent completions. |
| gnat/agents/parsing.py | Allows injecting unified LLMClient for structured parsing extraction. |
| gnat/agents/openai_compatible.py | Adds streaming/tool-calls/embeddings to OpenAI-compatible provider. |
| gnat/agents/llm.py | Expands LLMClient with fallback chain, streaming, tool-calls, embeddings. |
| gnat/agents/embeddings.py | Adds in-memory embedding store + cosine similarity search. |
| gnat/agents/claude.py | Adds SSE streaming, tool calling, prompt caching, and embed stub. |
| gnat/agents/catalog.py | Adds workflow catalog registry with built-in workflows registered. |
| gnat/agents/base.py | Extends LLMProvider interface with stream/tool_call/embed defaults. |
| alembic/versions/0009_workflow_runs.py | Adds DB table + indices for workflow run history persistence. |
Comments suppressed due to low confidence (1)
gnat/agents/parsing.py:316
- When using the new
LLMClientpath, if structured extraction fails or returns a non-dict, the code drops the record with no text fallback (even whenalways_yield_summaryis enabled). Consider adding a fallback that callsllm_client.chat()to get a plain-text summary so failures degrade gracefully rather than returningNonefor the whole record.
if not data or not isinstance(data, dict):
# For LLMClient path, `response` may not be set; use empty fallback
text_fallback = ""
if self._llm is None:
text_fallback = self._client.text_from(response) # type: ignore[arg-type]
if text_fallback:
return ParsedIntel(
summary=text_fallback[:2000],
confidence=20,
source_url=source_url,
source_topic=source_topic,
model=self._config.model,
)
return None
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| for chunk in self._llm.stream( | ||
| [{"role": "user", "content": query}], | ||
| system=self._system, | ||
| ): |
There was a problem hiding this comment.
CopilotPane calls self._llm.stream() with a list of chat messages, but LLMClient.stream() expects a single prompt string. This will raise at runtime when llm_client is an LLMClient. Pass query as the prompt (and keep system= for the system prompt), or update the interface consistently across callers/providers.
| Incremental text chunks as they arrive. | ||
| """ | ||
| raise NotImplementedError(f"{type(self).__name__} does not support streaming") | ||
| yield # make this a generator |
There was a problem hiding this comment.
LLMProvider.stream() includes an unreachable yield, which turns the method into a generator function. That changes behavior: calling provider.stream(...) returns a generator instead of immediately raising NotImplementedError, and the error is only raised on iteration. Remove the yield and keep this as a normal method that raises, or implement a real generator in subclasses only.
| yield # make this a generator |
| def _complete_json(self, prompt: str) -> Any: | ||
| """Call the configured backend and return parsed JSON or None.""" | ||
| import json as _json | ||
|
|
||
| if self._llm is not None: | ||
| # Use structured() which returns parsed JSON directly | ||
| try: | ||
| return self._llm.structured( | ||
| prompt=prompt, | ||
| output_schema={"type": "object"}, | ||
| temperature=0.2, | ||
| ) | ||
| except Exception: | ||
| return None |
There was a problem hiding this comment.
_complete_json() forces LLMClient.structured() to use output_schema={"type":"object"}, but feed mode explicitly asks for a JSON array of objects. When llm_client is provided, feed-driven mode will likely parse to a dict and then be discarded by the isinstance(items, list) guard, yielding no records. Use an array schema for feed prompts (or make the schema selectable per prompt).
| if not section.condition_expr: | ||
| return True | ||
| try: | ||
| return bool(eval(section.condition_expr, {"agg": aggregates})) # nosec B307 | ||
| except Exception as exc: |
There was a problem hiding this comment.
ReportTemplate.evaluate_condition() uses eval() on condition_expr with access to Python builtins by default. If templates/sections can be user- or config-supplied, this is arbitrary code execution. Use a safe expression evaluator (e.g., ast-based whitelist), or at minimum pass {"__builtins__": {}} and expose only the small set of functions you intend (e.g., len, getattr) alongside agg.
| if spikes_only: | ||
| reports = detector.spikes_only(window_days=window_days, platform=platform) | ||
| else: | ||
| reports = detector.detect_all( | ||
| window_days=window_days, | ||
| platform=platform, | ||
| stix_types=types, | ||
| ) |
There was a problem hiding this comment.
When spikes_only=true, the stix_type filter is ignored (only window_days and platform are forwarded). This makes the endpoint behave inconsistently vs the non-spike path. Consider adding a stix_types parameter to TrendDetector.spikes_only() (or filtering the results here) so stix_type= works in both modes.
| """ | ||
| Server-Sent Events endpoint for real-time dashboard updates. | ||
|
|
||
| Emits events for: ``review_pending``, ``investigation_updated``, ``job_complete``. |
There was a problem hiding this comment.
The SSE endpoint docstring says it emits review_pending, investigation_updated, job_complete, but the implementation currently emits connected, review_pending (only if app.state.review_service exists), and job_running. Update the docstring (and/or implement the missing events) so client expectations match actual emitted event types.
| Emits events for: ``review_pending``, ``investigation_updated``, ``job_complete``. | |
| Emits ``connected`` on initial subscription, ``review_pending`` when a | |
| review service is available, and ``job_running`` when scheduler jobs | |
| are currently running. |
| class WorkflowRunRequest(BaseModel): | ||
| """Request body for POST /api/workflows/{name}/run.""" | ||
|
|
||
| shared: dict[str, Any] = {} | ||
| investigation_id: str | None = None | ||
|
|
There was a problem hiding this comment.
Pydantic model field shared uses a mutable default ({}). Prefer a default factory (e.g., Field(default_factory=dict)) to avoid surprising shared state and to align with common FastAPI/Pydantic conventions.
| """ | ||
| import json as _json | ||
|
|
||
| th = theme or VisualizationTheme() | ||
| types = stix_types or ["attack-pattern"] |
There was a problem hiding this comment.
Unused import: import json as _json is never referenced in this function. Remove it to avoid lint failures and keep the module tidy.
| events.sort(key=lambda e: e["ts"]) | ||
|
|
||
| import json as _json | ||
| rows_html = "" | ||
| for i, ev in enumerate(events[:200]): |
There was a problem hiding this comment.
Unused import: import json as _json is never referenced in this function. Remove it to avoid lint failures and keep the module tidy.
| Validated when *secret* is set. | ||
|
|
||
| Returns | ||
| ------- | ||
| WorkflowResult | None | ||
| """ | ||
| if self._secret and signature: |
There was a problem hiding this comment.
WebhookTrigger.handle_request() only verifies signatures when both secret and signature are provided. If secret is configured but the request omits the signature header, the webhook is accepted without validation. If the intent is to secure the endpoint, require a signature whenever secret is set (reject missing signatures) and verify it for every request.
| Validated when *secret* is set. | |
| Returns | |
| ------- | |
| WorkflowResult | None | |
| """ | |
| if self._secret and signature: | |
| Required and validated when *secret* is set. | |
| Returns | |
| ------- | |
| WorkflowResult | None | |
| """ | |
| if self._secret: | |
| if not signature: | |
| logger.warning("WebhookTrigger %r: missing signature — request rejected", self.name) | |
| return None |
…, viz, connectors, reports, analytics, workflows, analyst UX)
Area 1 — Autonomous Investigation Pipelines:
Area 2 — Deeper AI/LLM Integration:
Area 3 — Enhanced Visualization:
Area 4 — Expanded Connector Ecosystem:
Area 5 — Report Building Enhancements:
Area 6 — Advanced Visualization & Analytics:
Area 7 — Workflow Orchestration:
Area 8 — Analyst UX Improvements:
All 3955 unit tests pass.
https://claude.ai/code/session_01BDoue9HxB83ijLzFARAugq