Skip to content

Commit 5494a86

Browse files
authored
Merge pull request #70 from wrhalpin/claude/add-claude-documentation-k8vvJ
Claude/add claude documentation k8vv j
2 parents e29f6b1 + 5140480 commit 5494a86

10 files changed

Lines changed: 3959 additions & 0 deletions

File tree

gnat/connectors/crowdstrike/client.py

Lines changed: 317 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,3 +101,320 @@ def from_stix(self, stix_dict: dict[str, Any]) -> dict[str, Any]:
101101
"action": "detect",
102102
"severity": "medium",
103103
}
104+
105+
# ── Detections ────────────────────────────────────────────────────────────────
106+
107+
def list_detections(
108+
self,
109+
filter_fql: str = "",
110+
limit: int = 100,
111+
offset: int = 0,
112+
) -> list[dict[str, Any]]:
113+
"""List detection summaries via FQL filter."""
114+
params: dict[str, Any] = {"limit": limit, "offset": offset}
115+
if filter_fql:
116+
params["filter"] = filter_fql
117+
ids_resp = self.get("/detects/queries/detects/v1", params=params)
118+
ids = (ids_resp.get("resources", []) if isinstance(ids_resp, dict) else [])[:limit]
119+
if not ids:
120+
return []
121+
resp = self.post("/detects/entities/summaries/GET/v1", json={"ids": ids})
122+
return resp.get("resources", []) if isinstance(resp, dict) else []
123+
124+
def get_detection(self, detection_id: str) -> dict[str, Any]:
125+
"""Retrieve a single detection summary by ID."""
126+
resp = self.post("/detects/entities/summaries/GET/v1", json={"ids": [detection_id]})
127+
resources = resp.get("resources", []) if isinstance(resp, dict) else []
128+
return resources[0] if resources else {}
129+
130+
def update_detection(
131+
self,
132+
detection_id: str,
133+
status: str = "",
134+
assigned_to_uuid: str = "",
135+
comment: str = "",
136+
) -> dict[str, Any]:
137+
"""Update detection status, assignment, or add a comment."""
138+
payload: dict[str, Any] = {"ids": [detection_id]}
139+
if status:
140+
payload["status"] = status
141+
if assigned_to_uuid:
142+
payload["assigned_to_uuid"] = assigned_to_uuid
143+
if comment:
144+
payload["comment"] = comment
145+
return self.patch("/detects/entities/detects/v2", json=payload)
146+
147+
# ── Incidents ─────────────────────────────────────────────────────────────────
148+
149+
def list_incidents(
150+
self,
151+
filter_fql: str = "",
152+
limit: int = 100,
153+
offset: int = 0,
154+
) -> list[dict[str, Any]]:
155+
"""List incident summaries."""
156+
params: dict[str, Any] = {"limit": limit, "offset": offset}
157+
if filter_fql:
158+
params["filter"] = filter_fql
159+
ids_resp = self.get("/incidents/queries/incidents/v1", params=params)
160+
ids = (ids_resp.get("resources", []) if isinstance(ids_resp, dict) else [])[:limit]
161+
if not ids:
162+
return []
163+
resp = self.post("/incidents/entities/incidents/GET/v1", json={"ids": ids})
164+
return resp.get("resources", []) if isinstance(resp, dict) else []
165+
166+
def get_incident(self, incident_id: str) -> dict[str, Any]:
167+
"""Retrieve a single incident by ID."""
168+
resp = self.post("/incidents/entities/incidents/GET/v1", json={"ids": [incident_id]})
169+
resources = resp.get("resources", []) if isinstance(resp, dict) else []
170+
return resources[0] if resources else {}
171+
172+
def update_incident(
173+
self,
174+
incident_id: str,
175+
status: Optional[int] = None,
176+
assigned_to: str = "",
177+
tags: Optional[list[str]] = None,
178+
) -> dict[str, Any]:
179+
"""Update incident status or assignment."""
180+
action: dict[str, Any] = {"id": incident_id}
181+
if status is not None:
182+
action["status"] = status
183+
if assigned_to:
184+
action["assigned_to"] = assigned_to
185+
if tags is not None:
186+
action["tags"] = tags
187+
return self.post("/incidents/entities/incident-actions/v1", json={"action_parameters": [action]})
188+
189+
def get_incident_behaviors(self, incident_id: str) -> list[dict[str, Any]]:
190+
"""List behaviors (sub-events) associated with an incident."""
191+
resp = self.get("/incidents/queries/behaviors/v1", params={"filter": f"incident_id:'{incident_id}'"})
192+
ids = resp.get("resources", []) if isinstance(resp, dict) else []
193+
if not ids:
194+
return []
195+
detail = self.post("/incidents/entities/behaviors/GET/v1", json={"ids": ids})
196+
return detail.get("resources", []) if isinstance(detail, dict) else []
197+
198+
# ── Hosts ─────────────────────────────────────────────────────────────────────
199+
200+
def list_hosts(
201+
self,
202+
filter_fql: str = "",
203+
limit: int = 100,
204+
offset: int = 0,
205+
) -> list[dict[str, Any]]:
206+
"""List host device summaries."""
207+
params: dict[str, Any] = {"limit": limit, "offset": offset}
208+
if filter_fql:
209+
params["filter"] = filter_fql
210+
ids_resp = self.get("/devices/queries/devices/v1", params=params)
211+
ids = (ids_resp.get("resources", []) if isinstance(ids_resp, dict) else [])[:limit]
212+
if not ids:
213+
return []
214+
resp = self.post("/devices/entities/devices/GET/v2", json={"ids": ids})
215+
return resp.get("resources", []) if isinstance(resp, dict) else []
216+
217+
def get_host(self, device_id: str) -> dict[str, Any]:
218+
"""Retrieve a single host device summary."""
219+
resp = self.post("/devices/entities/devices/GET/v2", json={"ids": [device_id]})
220+
resources = resp.get("resources", []) if isinstance(resp, dict) else []
221+
return resources[0] if resources else {}
222+
223+
def contain_host(self, device_id: str) -> dict[str, Any]:
224+
"""Network-contain a host (isolate from network)."""
225+
return self.post(
226+
"/devices/entities/devices-actions/v2",
227+
params={"action_name": "contain"},
228+
json={"ids": [device_id]},
229+
)
230+
231+
def lift_containment(self, device_id: str) -> dict[str, Any]:
232+
"""Lift network containment from a host."""
233+
return self.post(
234+
"/devices/entities/devices-actions/v2",
235+
params={"action_name": "lift_containment"},
236+
json={"ids": [device_id]},
237+
)
238+
239+
def get_host_login_history(self, device_id: str) -> list[dict[str, Any]]:
240+
"""Retrieve login history for a specific host."""
241+
resp = self.post("/devices/combined/devices/login-history/v1", json={"ids": [device_id]})
242+
return resp.get("resources", []) if isinstance(resp, dict) else []
243+
244+
def get_host_network_addresses(self, device_id: str) -> list[dict[str, Any]]:
245+
"""Retrieve network address history for a host."""
246+
resp = self.post("/devices/combined/devices/network-address-history/v1", json={"ids": [device_id]})
247+
return resp.get("resources", []) if isinstance(resp, dict) else []
248+
249+
# ── Threat Intelligence ───────────────────────────────────────────────────────
250+
251+
def get_intel_actor(self, actor_id: str) -> dict[str, Any]:
252+
"""Retrieve a Falcon Intelligence adversary/actor by ID or slug."""
253+
resp = self.get("/intel/entities/actors/v1", params={"ids": actor_id})
254+
resources = resp.get("resources", []) if isinstance(resp, dict) else []
255+
return resources[0] if resources else {}
256+
257+
def list_intel_actors(
258+
self,
259+
query: str = "",
260+
limit: int = 100,
261+
offset: int = 0,
262+
) -> list[dict[str, Any]]:
263+
"""Search Falcon Intelligence adversary profiles."""
264+
params: dict[str, Any] = {"limit": limit, "offset": offset}
265+
if query:
266+
params["q"] = query
267+
resp = self.get("/intel/combined/actors/v1", params=params)
268+
return resp.get("resources", []) if isinstance(resp, dict) else []
269+
270+
def get_intel_malware(self, malware_id: str) -> dict[str, Any]:
271+
"""Retrieve a Falcon Intelligence malware family profile."""
272+
resp = self.get("/intel/entities/malware/v1", params={"ids": malware_id})
273+
resources = resp.get("resources", []) if isinstance(resp, dict) else []
274+
return resources[0] if resources else {}
275+
276+
def list_intel_reports(
277+
self,
278+
query: str = "",
279+
limit: int = 100,
280+
offset: int = 0,
281+
) -> list[dict[str, Any]]:
282+
"""Search Falcon Intelligence reports."""
283+
params: dict[str, Any] = {"limit": limit, "offset": offset}
284+
if query:
285+
params["q"] = query
286+
resp = self.get("/intel/combined/reports/v1", params=params)
287+
return resp.get("resources", []) if isinstance(resp, dict) else []
288+
289+
def get_intel_report(self, report_id: str) -> dict[str, Any]:
290+
"""Retrieve a single Falcon Intelligence report by ID."""
291+
resp = self.get("/intel/entities/reports/v1", params={"ids": report_id})
292+
resources = resp.get("resources", []) if isinstance(resp, dict) else []
293+
return resources[0] if resources else {}
294+
295+
def list_intel_indicators(
296+
self,
297+
filter_fql: str = "",
298+
limit: int = 100,
299+
offset: int = 0,
300+
) -> list[dict[str, Any]]:
301+
"""List Falcon Intelligence indicators (not custom IOCs)."""
302+
params: dict[str, Any] = {"limit": limit, "offset": offset, "include_deleted": False}
303+
if filter_fql:
304+
params["filter"] = filter_fql
305+
resp = self.get("/intel/combined/indicators/v1", params=params)
306+
return resp.get("resources", []) if isinstance(resp, dict) else []
307+
308+
# ── Custom IOCs ───────────────────────────────────────────────────────────────
309+
310+
def create_ioc(self, ioc_payload: dict[str, Any]) -> dict[str, Any]:
311+
"""
312+
Create a custom IOC.
313+
314+
``ioc_payload`` should follow the CrowdStrike IOC entity schema::
315+
316+
{
317+
"type": "ipv4",
318+
"value": "1.2.3.4",
319+
"action": "detect",
320+
"severity": "high",
321+
"description": "...",
322+
"platforms": ["windows"],
323+
}
324+
"""
325+
resp = self.post("/iocs/entities/indicators/v1", json={"indicators": [ioc_payload]})
326+
resources = resp.get("resources", []) if isinstance(resp, dict) else []
327+
return resources[0] if resources else {}
328+
329+
def update_ioc(self, ioc_id: str, updates: dict[str, Any]) -> dict[str, Any]:
330+
"""Update an existing custom IOC by ID."""
331+
updates["id"] = ioc_id
332+
resp = self.patch("/iocs/entities/indicators/v1", json={"indicators": [updates]})
333+
resources = resp.get("resources", []) if isinstance(resp, dict) else []
334+
return resources[0] if resources else {}
335+
336+
def delete_ioc(self, ioc_id: str) -> dict[str, Any]:
337+
"""Delete a custom IOC by ID."""
338+
return self.delete(f"/iocs/entities/indicators/v1?ids={ioc_id}")
339+
340+
def list_iocs(
341+
self,
342+
filter_fql: str = "",
343+
limit: int = 100,
344+
offset: int = 0,
345+
) -> list[dict[str, Any]]:
346+
"""List custom IOCs with optional FQL filter."""
347+
params: dict[str, Any] = {"limit": limit, "offset": offset}
348+
if filter_fql:
349+
params["filter"] = filter_fql
350+
ids_resp = self.get("/iocs/queries/indicators/v1", params=params)
351+
ids = (ids_resp.get("resources", []) if isinstance(ids_resp, dict) else [])[:limit]
352+
if not ids:
353+
return []
354+
resp = self.get("/iocs/entities/indicators/v1", params={"ids": ids})
355+
return resp.get("resources", []) if isinstance(resp, dict) else []
356+
357+
# ── Real-Time Response (RTR) ──────────────────────────────────────────────────
358+
359+
def init_rtr_session(self, device_id: str, queue_offline: bool = False) -> dict[str, Any]:
360+
"""
361+
Initialise a Real-Time Response session with a host.
362+
363+
Returns a session dict containing ``session_id``.
364+
"""
365+
return self.post(
366+
"/real-time-response/entities/sessions/v1",
367+
json={"device_id": device_id, "queue_offline": queue_offline},
368+
)
369+
370+
def execute_rtr_command(
371+
self,
372+
session_id: str,
373+
command_string: str,
374+
base_command: str = "runscript",
375+
) -> dict[str, Any]:
376+
"""
377+
Execute a command in an active RTR session.
378+
379+
``base_command`` options: ``ls``, ``cd``, ``pwd``, ``ps``,
380+
``runscript``, ``get``, ``put``, ``reg query``, etc.
381+
"""
382+
return self.post(
383+
"/real-time-response/entities/command/v1",
384+
json={
385+
"session_id": session_id,
386+
"base_command": base_command,
387+
"command_string": command_string,
388+
},
389+
)
390+
391+
def delete_rtr_session(self, session_id: str) -> dict[str, Any]:
392+
"""Delete an RTR session and release the connection."""
393+
return self.delete(f"/real-time-response/entities/sessions/v1?session_id={session_id}")
394+
395+
# ── Vulnerabilities / Spotlight ───────────────────────────────────────────────
396+
397+
def list_vulnerabilities(
398+
self,
399+
filter_fql: str = "",
400+
limit: int = 100,
401+
after: str = "",
402+
) -> list[dict[str, Any]]:
403+
"""List Spotlight vulnerability findings."""
404+
params: dict[str, Any] = {"limit": limit}
405+
if filter_fql:
406+
params["filter"] = filter_fql
407+
if after:
408+
params["after"] = after
409+
ids_resp = self.get("/spotlight/queries/vulnerabilities/v1", params=params)
410+
ids = (ids_resp.get("resources", []) if isinstance(ids_resp, dict) else [])[:limit]
411+
if not ids:
412+
return []
413+
resp = self.get("/spotlight/entities/vulnerabilities/v2", params={"ids": ids})
414+
return resp.get("resources", []) if isinstance(resp, dict) else []
415+
416+
def get_vulnerability(self, vuln_id: str) -> dict[str, Any]:
417+
"""Retrieve a single Spotlight vulnerability finding."""
418+
resp = self.get("/spotlight/entities/vulnerabilities/v2", params={"ids": vuln_id})
419+
resources = resp.get("resources", []) if isinstance(resp, dict) else []
420+
return resources[0] if resources else {}

0 commit comments

Comments
 (0)