Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
323 changes: 323 additions & 0 deletions gnat/connectors/cloudsek/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,3 +241,326 @@ def _stix_to_cs_category(pattern: str) -> str:
if "file:hashes" in pattern:
return "malware"
return "brand_abuse"

# ── Alerts ────────────────────────────────────────────────────────────────

def list_alerts(
self,
category: str = "",
severity: str = "",
status: str = "",
from_date: str = "",
to_date: str = "",
keyword: str = "",
limit: int = 100,
offset: int = 0,
) -> list[dict[str, Any]]:
"""
List XVigil alerts with optional filters.

Parameters
----------
category : str
Alert category: ``"credential_leak"``, ``"brand_abuse"``,
``"phishing"``, ``"dark_web"``, ``"ip_exposure"``, etc.
severity : str
``"low"``, ``"medium"``, ``"high"``, or ``"critical"``.
status : str
``"open"``, ``"acknowledged"``, ``"resolved"``, ``"false_positive"``.
from_date / to_date : str
ISO-8601 date range, e.g. ``"2024-01-01"``.
keyword : str
Free-text keyword search.
"""
params: dict[str, Any] = {
"limit": min(limit, 500),
"offset": offset,
}
if category:
params["category"] = category
if severity:
params["severity"] = severity
if status:
params["status"] = status
if from_date:
params["from_date"] = from_date
if to_date:
params["to_date"] = to_date
if keyword:
params["keyword"] = keyword

resp = self.get(f"{_API}/alerts", params=params)
return resp.get("data", []) if isinstance(resp, dict) else []

def get_alert(self, alert_id: str) -> dict[str, Any]:
"""Retrieve a specific XVigil alert by ID."""
resp = self.get(f"{_API}/alerts/{alert_id}")
return resp.get("data", resp) if isinstance(resp, dict) else {}

def add_alert_comment(self, alert_id: str, comment: str) -> dict[str, Any]:
"""Add an analyst comment to an alert."""
resp = self.post(f"{_API}/alerts/{alert_id}/comments", json={"comment": comment})
return resp if isinstance(resp, dict) else {}

def list_credential_leaks(
self, limit: int = 100, from_date: str = "", to_date: str = ""
) -> list[dict[str, Any]]:
"""List credential leak alerts (exposed usernames/passwords)."""
return self.list_alerts(
category="credential_leak", limit=limit,
from_date=from_date, to_date=to_date,
)

def list_brand_abuse(
self, limit: int = 100, from_date: str = "", to_date: str = ""
) -> list[dict[str, Any]]:
"""List brand abuse / impersonation alerts."""
return self.list_alerts(
category="brand_abuse", limit=limit,
from_date=from_date, to_date=to_date,
)

def list_phishing_urls(
self, limit: int = 100, from_date: str = "", to_date: str = ""
) -> list[dict[str, Any]]:
"""List phishing URL / site alerts."""
return self.list_alerts(
category="phishing", limit=limit,
from_date=from_date, to_date=to_date,
)

def list_dark_web_mentions(
self, limit: int = 100, from_date: str = "", to_date: str = ""
) -> list[dict[str, Any]]:
"""List dark web mention alerts."""
return self.list_alerts(
category="dark_web", limit=limit,
from_date=from_date, to_date=to_date,
)

def list_exposed_assets(
self, limit: int = 100, from_date: str = "", to_date: str = ""
) -> list[dict[str, Any]]:
"""List exposed asset / IP exposure alerts."""
return self.list_alerts(
category="ip_exposure", limit=limit,
from_date=from_date, to_date=to_date,
)

def list_code_leaks(
self, limit: int = 100, from_date: str = "", to_date: str = ""
) -> list[dict[str, Any]]:
"""List source code leak alerts (GitHub, GitLab, Pastebin exposure)."""
return self.list_alerts(
category="code_leak", limit=limit,
from_date=from_date, to_date=to_date,
)

# ── Indicators ────────────────────────────────────────────────────────────

def search_indicators(
self,
keyword: str,
ioc_type: str = "",
limit: int = 100,
offset: int = 0,
) -> list[dict[str, Any]]:
"""
Search XVigil threat indicators.

``ioc_type`` options: ``"ip"``, ``"domain"``, ``"url"``, ``"email"``,
``"hash"``, ``"cve"``.
"""
params: dict[str, Any] = {
"keyword": keyword,
"limit": min(limit, 500),
"offset": offset,
}
if ioc_type:
params["type"] = ioc_type
resp = self.get(f"{_API}/indicators", params=params)
return resp.get("data", []) if isinstance(resp, dict) else []

def get_domain_intelligence(self, domain: str) -> dict[str, Any]:
"""
Get threat intelligence for a specific domain.

Returns risk score, historical alerts, and associated threat actors.
"""
resp = self.get(f"{_API}/intelligence/domain", params={"domain": domain})
return resp.get("data", resp) if isinstance(resp, dict) else {}

def get_ip_intelligence(self, ip: str) -> dict[str, Any]:
"""
Get threat intelligence for a specific IP address.

Returns geolocation, ASN, open ports, and associated threat data.
"""
resp = self.get(f"{_API}/intelligence/ip", params={"ip": ip})
return resp.get("data", resp) if isinstance(resp, dict) else {}

def get_email_intelligence(self, email: str) -> dict[str, Any]:
"""
Check an email address against CloudSEK breach databases.

Returns breach records, credential leaks, and account exposure.
"""
resp = self.get(f"{_API}/intelligence/email", params={"email": email})
return resp.get("data", resp) if isinstance(resp, dict) else {}

# ── Threat actors ─────────────────────────────────────────────────────────

def list_threat_actors(
self, limit: int = 100, offset: int = 0
) -> list[dict[str, Any]]:
"""List tracked threat actor profiles."""
resp = self.get(
f"{_API}/threat_actors",
params={"limit": min(limit, 500), "offset": offset},
)
return resp.get("data", []) if isinstance(resp, dict) else []

def get_threat_actor(self, actor_id: str) -> dict[str, Any]:
"""Retrieve a specific threat actor profile."""
resp = self.get(f"{_API}/threat_actors/{actor_id}")
return resp.get("data", resp) if isinstance(resp, dict) else {}

def get_actor_alerts(self, actor_id: str, limit: int = 50) -> list[dict[str, Any]]:
"""List alerts attributed to a specific threat actor."""
resp = self.get(
f"{_API}/threat_actors/{actor_id}/alerts",
params={"limit": limit},
)
return resp.get("data", []) if isinstance(resp, dict) else []

# ── Malware ───────────────────────────────────────────────────────────────

def list_malware_families(
self, limit: int = 100, offset: int = 0
) -> list[dict[str, Any]]:
"""List tracked malware family profiles."""
resp = self.get(
f"{_API}/malware",
params={"limit": min(limit, 500), "offset": offset},
)
return resp.get("data", []) if isinstance(resp, dict) else []

def get_malware_family(self, malware_id: str) -> dict[str, Any]:
"""Retrieve a specific malware family profile."""
resp = self.get(f"{_API}/malware/{malware_id}")
return resp.get("data", resp) if isinstance(resp, dict) else {}

# ── Watchlist (monitoring keywords) ──────────────────────────────────────

def list_watchlist_keywords(self) -> list[dict[str, Any]]:
"""List currently monitored keywords/assets in the XVigil watchlist."""
resp = self.get(f"{_API}/watchlist")
return resp.get("data", []) if isinstance(resp, dict) else []

def add_watchlist_keyword(
self,
keyword: str,
category: str = "brand_abuse",
alert_on_match: bool = True,
) -> dict[str, Any]:
"""
Add a keyword to the XVigil monitoring watchlist.

``category`` controls which detection module monitors the keyword.
"""
payload: dict[str, Any] = {
"keyword": keyword,
"category": category,
"alert_on_match": alert_on_match,
}
resp = self.post(f"{_API}/watchlist", json=payload)
return resp if isinstance(resp, dict) else {}

def remove_watchlist_keyword(self, keyword_id: str) -> dict[str, Any]:
"""Remove a keyword from the watchlist by ID."""
resp = self.delete(f"{_API}/watchlist/{keyword_id}")
return resp if isinstance(resp, dict) else {}

# ── Dashboard & reporting ─────────────────────────────────────────────────

def get_executive_summary(
self,
from_date: str = "",
to_date: str = "",
) -> dict[str, Any]:
"""
Retrieve the XVigil executive dashboard summary.

Returns alert counts by category and severity, trend data,
and top threat actors for the specified time window.
"""
params: dict[str, Any] = {}
if from_date:
params["from_date"] = from_date
if to_date:
params["to_date"] = to_date
resp = self.get(f"{_API}/dashboard/summary", params=params)
return resp.get("data", resp) if isinstance(resp, dict) else {}

def get_risk_score(self) -> dict[str, Any]:
"""
Retrieve the organisation's current XVigil risk score.

Returns an overall digital risk score with component breakdowns
(brand exposure, credential leaks, dark web presence, etc.).
"""
resp = self.get(f"{_API}/risk_score")
return resp.get("data", resp) if isinstance(resp, dict) else {}

def export_alerts(
self,
export_format: str = "json",
from_date: str = "",
to_date: str = "",
category: str = "",
) -> dict[str, Any]:
"""
Export alerts in bulk.

``export_format`` options: ``"json"`` (default), ``"csv"``.
Returns a dict containing download URL or inline data.
"""
params: dict[str, Any] = {"format": export_format}
if from_date:
params["from_date"] = from_date
if to_date:
params["to_date"] = to_date
if category:
params["category"] = category
resp = self.get(f"{_API}/alerts/export", params=params)
return resp if isinstance(resp, dict) else {}

# ── Asset management ──────────────────────────────────────────────────────

def list_monitored_assets(self) -> list[dict[str, Any]]:
"""List all assets currently monitored by CloudSEK XVigil."""
resp = self.get(f"{_API}/assets")
return resp.get("data", []) if isinstance(resp, dict) else []

def add_monitored_asset(
self,
asset_type: str,
value: str,
label: str = "",
) -> dict[str, Any]:
"""
Register an asset for monitoring.

``asset_type`` options: ``"domain"``, ``"ip"``, ``"email_domain"``,
``"brand_keyword"``, ``"mobile_app"``.
"""
payload: dict[str, Any] = {"type": asset_type, "value": value}
if label:
payload["label"] = label
resp = self.post(f"{_API}/assets", json=payload)
return resp if isinstance(resp, dict) else {}

def remove_monitored_asset(self, asset_id: str) -> dict[str, Any]:
"""Remove an asset from monitoring."""
resp = self.delete(f"{_API}/assets/{asset_id}")
return resp if isinstance(resp, dict) else {}
Loading
Loading