|
| 1 | +# SPDX-License-Identifier: Apache-2.0 |
| 2 | +# Copyright 2026 Bill Halpin |
| 3 | +""" |
| 4 | +gnat.agents.assistant_analyst |
| 5 | +================================ |
| 6 | +
|
| 7 | +Live Analyst Assistant: stateless but context-aware agent providing |
| 8 | +on-demand enrichment suggestions, report drafting, finding explanations. |
| 9 | +
|
| 10 | +Uses hybrid LLM strategy: streaming for chat, batched for operations. |
| 11 | +""" |
| 12 | + |
| 13 | +from dataclasses import dataclass |
| 14 | +from typing import Optional, List, Dict, Any, AsyncIterator |
| 15 | +from datetime import datetime |
| 16 | + |
| 17 | +from gnat.agents.conversations import ConversationStore, ConversationTurn, ConversationRole |
| 18 | +from gnat.agents.llm import LLMClient |
| 19 | +from gnat.agents.base import AgentConfig |
| 20 | +from gnat.orm.base import STIXBase |
| 21 | + |
| 22 | + |
| 23 | +@dataclass |
| 24 | +class EnrichmentSuggestion: |
| 25 | + """Recommended connector for enrichment.""" |
| 26 | + connector_name: str |
| 27 | + reason: str |
| 28 | + confidence: float |
| 29 | + estimated_duration_sec: int |
| 30 | + |
| 31 | + |
| 32 | +@dataclass |
| 33 | +class ReportDraftOption: |
| 34 | + """Generated report section option.""" |
| 35 | + section_type: str |
| 36 | + text: str |
| 37 | + tone: str # "formal" | "technical" | "executive" |
| 38 | + quality_score: float |
| 39 | + |
| 40 | + |
| 41 | +class LiveAnalystAssistantSession: |
| 42 | + """ |
| 43 | + On-demand assistant for threat intelligence analysis. |
| 44 | + Stateless (no investigation state stored) but context-aware (gets passed investigation data). |
| 45 | + Streaming responses for chat, batched responses for operations (draft, explain). |
| 46 | + """ |
| 47 | + |
| 48 | + def __init__( |
| 49 | + self, |
| 50 | + conversation_id: str, |
| 51 | + config: AgentConfig, |
| 52 | + conversation_store: Optional[ConversationStore] = None, |
| 53 | + llm_client: Optional[LLMClient] = None, |
| 54 | + ): |
| 55 | + """ |
| 56 | + Initialize assistant session. |
| 57 | +
|
| 58 | + Args: |
| 59 | + conversation_id: Unique session ID |
| 60 | + config: Agent configuration |
| 61 | + conversation_store: Optional custom store |
| 62 | + llm_client: Optional custom LLM client |
| 63 | + """ |
| 64 | + self.conversation_id = conversation_id |
| 65 | + self.config = config |
| 66 | + self.store = conversation_store or ConversationStore() |
| 67 | + self.llm = llm_client or LLMClient.from_config(config) |
| 68 | + |
| 69 | + ctx = self.store.get_session(conversation_id) |
| 70 | + if not ctx: |
| 71 | + raise ValueError(f"Conversation {conversation_id} not found") |
| 72 | + |
| 73 | + self.session_context = ctx |
| 74 | + |
| 75 | + async def suggest_enrichment(self, stix_object: STIXBase) -> AsyncIterator[EnrichmentSuggestion]: |
| 76 | + """ |
| 77 | + Suggest enrichment connectors for a STIX object. |
| 78 | + Streaming: yields suggestions as they're generated. |
| 79 | +
|
| 80 | + Args: |
| 81 | + stix_object: The object to enrich (Indicator, Malware, Actor, etc.) |
| 82 | +
|
| 83 | + Yields: |
| 84 | + EnrichmentSuggestion with connector name and reasoning |
| 85 | + """ |
| 86 | + # Log request |
| 87 | + user_query = f"Suggest enrichment for {stix_object.type}: {stix_object.get('value', 'N/A')}" |
| 88 | + await self._add_turn(ConversationRole.ANALYST, user_query) |
| 89 | + |
| 90 | + # Build prompt for suggestion |
| 91 | + prompt = self._build_enrichment_prompt(stix_object) |
| 92 | + |
| 93 | + # Stream response |
| 94 | + response_text = "" |
| 95 | + async for token in self.llm.stream(prompt): |
| 96 | + response_text += token |
| 97 | + # Parse token stream for suggestions (placeholder) |
| 98 | + # Real implementation: stream + parse JSON as it arrives |
| 99 | + yield EnrichmentSuggestion( |
| 100 | + connector_name="Recorded Future", |
| 101 | + reason="Specialized in reputation scoring", |
| 102 | + confidence=0.9, |
| 103 | + estimated_duration_sec=5, |
| 104 | + ) |
| 105 | + |
| 106 | + # Log full response |
| 107 | + await self._add_turn(ConversationRole.ASSISTANT, response_text) |
| 108 | + |
| 109 | + async def draft_report_section( |
| 110 | + self, |
| 111 | + section_type: str, |
| 112 | + investigation_context: Dict[str, Any], |
| 113 | + ) -> List[ReportDraftOption]: |
| 114 | + """ |
| 115 | + Generate report section drafts (batched operation). |
| 116 | + Returns multiple options for analyst to choose from. |
| 117 | +
|
| 118 | + Args: |
| 119 | + section_type: "executive_summary" | "findings" | "recommendations" | "timeline" |
| 120 | + investigation_context: Investigation data (IOCs, actors, campaigns, etc.) |
| 121 | +
|
| 122 | + Returns: |
| 123 | + List of 2-3 draft options |
| 124 | + """ |
| 125 | + # Log request |
| 126 | + user_query = f"Draft {section_type} for investigation" |
| 127 | + await self._add_turn(ConversationRole.ANALYST, user_query) |
| 128 | + |
| 129 | + # Build batched prompt |
| 130 | + prompt = self._build_draft_prompt(section_type, investigation_context) |
| 131 | + |
| 132 | + # Batched call (not streaming) |
| 133 | + response_text = await self.llm.call(prompt) |
| 134 | + |
| 135 | + # Log response |
| 136 | + await self._add_turn(ConversationRole.ASSISTANT, response_text) |
| 137 | + |
| 138 | + # Parse response to extract options (placeholder) |
| 139 | + # Real implementation: parse Claude response into structured ReportDraftOption objects |
| 140 | + options = [ |
| 141 | + ReportDraftOption( |
| 142 | + section_type=section_type, |
| 143 | + text="Draft 1 (formal tone)", |
| 144 | + tone="formal", |
| 145 | + quality_score=0.85, |
| 146 | + ), |
| 147 | + ReportDraftOption( |
| 148 | + section_type=section_type, |
| 149 | + text="Draft 2 (technical tone)", |
| 150 | + tone="technical", |
| 151 | + quality_score=0.88, |
| 152 | + ), |
| 153 | + ] |
| 154 | + |
| 155 | + return options |
| 156 | + |
| 157 | + async def explain_finding( |
| 158 | + self, |
| 159 | + stix_object: STIXBase, |
| 160 | + context: Dict[str, Any], |
| 161 | + ) -> AsyncIterator[str]: |
| 162 | + """ |
| 163 | + Explain why a finding matters in plain language. |
| 164 | + Streaming: yields explanation tokens. |
| 165 | +
|
| 166 | + Args: |
| 167 | + stix_object: The object being explained |
| 168 | + context: Investigation context for framing |
| 169 | +
|
| 170 | + Yields: |
| 171 | + Explanation text tokens |
| 172 | + """ |
| 173 | + # Log request |
| 174 | + user_query = f"Explain {stix_object.type}: {stix_object.get('value', 'N/A')}" |
| 175 | + await self._add_turn(ConversationRole.ANALYST, user_query) |
| 176 | + |
| 177 | + # Build streaming prompt |
| 178 | + prompt = self._build_explanation_prompt(stix_object, context) |
| 179 | + |
| 180 | + # Stream response |
| 181 | + response_text = "" |
| 182 | + async for token in self.llm.stream(prompt): |
| 183 | + response_text += token |
| 184 | + yield token |
| 185 | + |
| 186 | + # Log full response |
| 187 | + await self._add_turn(ConversationRole.ASSISTANT, response_text) |
| 188 | + |
| 189 | + async def search_help(self, analyst_query: str) -> AsyncIterator[str]: |
| 190 | + """ |
| 191 | + Help analyst search for data across connectors. |
| 192 | + Suggest connectors and search syntax based on natural language query. |
| 193 | + Streaming: yields suggestions. |
| 194 | +
|
| 195 | + Args: |
| 196 | + analyst_query: "Find APT29 infrastructure" | "Look for malware family X" |
| 197 | +
|
| 198 | + Yields: |
| 199 | + Search suggestion tokens (connector names, syntax examples) |
| 200 | + """ |
| 201 | + # Log query |
| 202 | + await self._add_turn(ConversationRole.ANALYST, analyst_query) |
| 203 | + |
| 204 | + # Build prompt to route query |
| 205 | + prompt = self._build_search_help_prompt(analyst_query) |
| 206 | + |
| 207 | + # Stream suggestions |
| 208 | + response_text = "" |
| 209 | + async for token in self.llm.stream(prompt): |
| 210 | + response_text += token |
| 211 | + yield token |
| 212 | + |
| 213 | + # Log response |
| 214 | + await self._add_turn(ConversationRole.ASSISTANT, response_text) |
| 215 | + |
| 216 | + # Private helpers |
| 217 | + |
| 218 | + async def _add_turn( |
| 219 | + self, |
| 220 | + role: ConversationRole, |
| 221 | + text: str, |
| 222 | + metadata: Optional[Dict[str, Any]] = None, |
| 223 | + ) -> ConversationTurn: |
| 224 | + """Log a conversational turn.""" |
| 225 | + return self.store.add_turn( |
| 226 | + self.conversation_id, |
| 227 | + role, |
| 228 | + text, |
| 229 | + metadata=metadata or {}, |
| 230 | + ) |
| 231 | + |
| 232 | + def _build_enrichment_prompt(self, stix_object: STIXBase) -> str: |
| 233 | + """Build Claude prompt for enrichment suggestions.""" |
| 234 | + return f""" |
| 235 | + Object type: {stix_object.type} |
| 236 | + Value: {stix_object.get('value', 'N/A')} |
| 237 | + TLP: {stix_object.get('labels', [])} |
| 238 | +
|
| 239 | + Suggest 3-5 GNAT connectors to enrich this object. |
| 240 | + For each, explain why it's relevant and how long enrichment takes. |
| 241 | + Format: "1. Connector Name - reason. Est. 5 min." |
| 242 | + """ |
| 243 | + |
| 244 | + def _build_draft_prompt( |
| 245 | + self, |
| 246 | + section_type: str, |
| 247 | + investigation_context: Dict[str, Any], |
| 248 | + ) -> str: |
| 249 | + """Build Claude prompt for report drafting.""" |
| 250 | + return f""" |
| 251 | + Report section: {section_type} |
| 252 | + Investigation context: {investigation_context} |
| 253 | +
|
| 254 | + Generate 2-3 draft options for this report section. |
| 255 | + Offer different tones: formal (executive), technical (SOC), narrative (incident report). |
| 256 | + Each draft should be 2-3 paragraphs. |
| 257 | + """ |
| 258 | + |
| 259 | + def _build_explanation_prompt( |
| 260 | + self, |
| 261 | + stix_object: STIXBase, |
| 262 | + context: Dict[str, Any], |
| 263 | + ) -> str: |
| 264 | + """Build Claude prompt for finding explanation.""" |
| 265 | + return f""" |
| 266 | + Object: {stix_object.type} = {stix_object.get('value', 'N/A')} |
| 267 | + Context: {context} |
| 268 | +
|
| 269 | + Explain in plain language why this finding matters. |
| 270 | + Reference any known campaigns or threat actors if applicable. |
| 271 | + Keep it to 2-3 sentences. |
| 272 | + """ |
| 273 | + |
| 274 | + def _build_search_help_prompt(self, analyst_query: str) -> str: |
| 275 | + """Build Claude prompt for search routing.""" |
| 276 | + return f""" |
| 277 | + Analyst query: "{analyst_query}" |
| 278 | +
|
| 279 | + Recommend GNAT connectors and search syntax to answer this query. |
| 280 | + For each connector, provide an example query or STIX pattern. |
| 281 | + Example format: |
| 282 | + - ThreatQ: [ipv4-addr:country = 'RU'] |
| 283 | + - Shodan: "Cobalt Strike" port:443 |
| 284 | + """ |
0 commit comments