Skip to content

Commit 8ccf22c

Browse files
authored
Merge pull request #1176 from MISP/codex/add-python-tool-for-galaxy-relationship-graph
Add GraphML generator for galaxies/clusters with optional inferred links
2 parents 72ab74d + 8d89b53 commit 8ccf22c

1 file changed

Lines changed: 285 additions & 0 deletions

File tree

tools/gen_graphml.py

Lines changed: 285 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,285 @@
1+
#!/usr/bin/env python3
2+
"""Generate a GraphML export for all MISP galaxies and clusters.
3+
4+
The graph contains:
5+
- One node per galaxy definition (`galaxies/*.json`)
6+
- One node per cluster value (`clusters/*.json`)
7+
- Optional explicit edges from `cluster.related`
8+
- Optional inferred edges across galaxy types when values/synonyms match
9+
"""
10+
11+
from __future__ import annotations
12+
13+
import argparse
14+
import itertools
15+
import json
16+
import re
17+
from dataclasses import dataclass
18+
from pathlib import Path
19+
import xml.etree.ElementTree as ET
20+
21+
22+
@dataclass(frozen=True)
23+
class Galaxy:
24+
uuid: str
25+
type: str
26+
name: str
27+
description: str
28+
29+
30+
@dataclass(frozen=True)
31+
class Cluster:
32+
node_id: str
33+
uuid: str
34+
value: str
35+
galaxy_type: str
36+
description: str
37+
meta: dict
38+
39+
40+
def normalize(text: str) -> str:
41+
return re.sub(r"\s+", " ", text.strip().lower())
42+
43+
44+
def load_galaxies(galaxies_dir: Path) -> dict[str, Galaxy]:
45+
galaxies: dict[str, Galaxy] = {}
46+
for path in sorted(galaxies_dir.glob("*.json")):
47+
with path.open(encoding="utf-8") as handle:
48+
data = json.load(handle)
49+
uuid = data.get("uuid")
50+
gtype = data.get("type")
51+
if not uuid or not gtype:
52+
continue
53+
galaxies[gtype] = Galaxy(
54+
uuid=uuid,
55+
type=gtype,
56+
name=data.get("name", gtype),
57+
description=data.get("description", ""),
58+
)
59+
return galaxies
60+
61+
62+
def load_clusters(clusters_dir: Path) -> tuple[dict[str, Cluster], list[tuple[str, str, str]]]:
63+
clusters: dict[str, Cluster] = {}
64+
explicit_edges: list[tuple[str, str, str]] = []
65+
66+
for path in sorted(clusters_dir.glob("*.json")):
67+
with path.open(encoding="utf-8") as handle:
68+
data = json.load(handle)
69+
70+
galaxy_type = data.get("type", "unknown")
71+
for index, raw_cluster in enumerate(data.get("values", []), start=1):
72+
uuid = raw_cluster.get("uuid") or f"{galaxy_type}:{index}"
73+
node_id = f"cluster:{uuid}"
74+
cluster = Cluster(
75+
node_id=node_id,
76+
uuid=uuid,
77+
value=raw_cluster.get("value", ""),
78+
galaxy_type=galaxy_type,
79+
description=raw_cluster.get("description", ""),
80+
meta=raw_cluster.get("meta", {}),
81+
)
82+
clusters[uuid] = cluster
83+
84+
for relation in raw_cluster.get("related", []) or []:
85+
dest_uuid = relation.get("dest-uuid")
86+
relation_type = relation.get("type", "related-to")
87+
if dest_uuid:
88+
explicit_edges.append((uuid, dest_uuid, relation_type))
89+
90+
return clusters, explicit_edges
91+
92+
93+
def cluster_terms(cluster: Cluster, include_synonyms: bool) -> set[str]:
94+
terms = set()
95+
if cluster.value:
96+
terms.add(normalize(cluster.value))
97+
98+
if include_synonyms:
99+
synonyms = cluster.meta.get("synonyms", [])
100+
for synonym in synonyms:
101+
if isinstance(synonym, str) and synonym.strip():
102+
terms.add(normalize(synonym))
103+
104+
return terms
105+
106+
107+
def add_graphml_keys(root: ET.Element) -> None:
108+
keys = [
109+
("d0", "node", "kind"),
110+
("d1", "node", "uuid"),
111+
("d2", "node", "name"),
112+
("d3", "node", "galaxy_type"),
113+
("d4", "node", "description"),
114+
("d5", "edge", "relation_type"),
115+
("d6", "edge", "relation_source"),
116+
]
117+
for key_id, attr_for, attr_name in keys:
118+
ET.SubElement(
119+
root,
120+
"key",
121+
id=key_id,
122+
**{"for": attr_for, "attr.name": attr_name, "attr.type": "string"},
123+
)
124+
125+
126+
def add_data(parent: ET.Element, key: str, value: str) -> None:
127+
data = ET.SubElement(parent, "data", key=key)
128+
data.text = value
129+
130+
131+
def build_graphml(
132+
galaxies: dict[str, Galaxy],
133+
clusters: dict[str, Cluster],
134+
explicit_edges: list[tuple[str, str, str]],
135+
include_explicit_edges: bool,
136+
inferred_mode: str,
137+
) -> ET.ElementTree:
138+
root = ET.Element("graphml", xmlns="http://graphml.graphdrawing.org/xmlns")
139+
add_graphml_keys(root)
140+
141+
graph = ET.SubElement(root, "graph", id="misp_galaxies", edgedefault="directed")
142+
143+
for galaxy in galaxies.values():
144+
node = ET.SubElement(graph, "node", id=f"galaxy:{galaxy.uuid}")
145+
add_data(node, "d0", "galaxy")
146+
add_data(node, "d1", galaxy.uuid)
147+
add_data(node, "d2", galaxy.name)
148+
add_data(node, "d3", galaxy.type)
149+
add_data(node, "d4", galaxy.description)
150+
151+
for cluster in clusters.values():
152+
node = ET.SubElement(graph, "node", id=cluster.node_id)
153+
add_data(node, "d0", "cluster")
154+
add_data(node, "d1", cluster.uuid)
155+
add_data(node, "d2", cluster.value)
156+
add_data(node, "d3", cluster.galaxy_type)
157+
add_data(node, "d4", cluster.description)
158+
159+
galaxy = galaxies.get(cluster.galaxy_type)
160+
if galaxy:
161+
edge = ET.SubElement(
162+
graph,
163+
"edge",
164+
source=f"galaxy:{galaxy.uuid}",
165+
target=cluster.node_id,
166+
)
167+
add_data(edge, "d5", "contains")
168+
add_data(edge, "d6", "membership")
169+
170+
edge_counter = itertools.count(1)
171+
172+
if include_explicit_edges:
173+
for source_uuid, target_uuid, relation_type in explicit_edges:
174+
source_cluster = clusters.get(source_uuid)
175+
target_cluster = clusters.get(target_uuid)
176+
if not source_cluster or not target_cluster:
177+
continue
178+
edge = ET.SubElement(
179+
graph,
180+
"edge",
181+
id=f"e{next(edge_counter)}",
182+
source=source_cluster.node_id,
183+
target=target_cluster.node_id,
184+
)
185+
add_data(edge, "d5", relation_type)
186+
add_data(edge, "d6", "explicit")
187+
188+
if inferred_mode != "none":
189+
include_synonyms = inferred_mode == "value-or-synonyms"
190+
term_index: dict[str, list[Cluster]] = {}
191+
for cluster in clusters.values():
192+
for term in cluster_terms(cluster, include_synonyms=include_synonyms):
193+
term_index.setdefault(term, []).append(cluster)
194+
195+
seen_pairs: set[tuple[str, str]] = set()
196+
for term, matching_clusters in term_index.items():
197+
if len(matching_clusters) < 2:
198+
continue
199+
200+
for left, right in itertools.combinations(matching_clusters, 2):
201+
if left.galaxy_type == right.galaxy_type:
202+
continue
203+
pair = tuple(sorted((left.uuid, right.uuid)))
204+
if pair in seen_pairs:
205+
continue
206+
seen_pairs.add(pair)
207+
208+
edge = ET.SubElement(
209+
graph,
210+
"edge",
211+
id=f"e{next(edge_counter)}",
212+
source=left.node_id,
213+
target=right.node_id,
214+
)
215+
add_data(edge, "d5", "same-value")
216+
add_data(edge, "d6", f"inferred:{term}")
217+
218+
return ET.ElementTree(root)
219+
220+
221+
def parse_args() -> argparse.Namespace:
222+
parser = argparse.ArgumentParser(
223+
description="Generate a GraphML graph from galaxies and clusters JSON files."
224+
)
225+
parser.add_argument(
226+
"--clusters-dir",
227+
type=Path,
228+
default=Path("clusters"),
229+
help="Directory containing cluster JSON files.",
230+
)
231+
parser.add_argument(
232+
"--galaxies-dir",
233+
type=Path,
234+
default=Path("galaxies"),
235+
help="Directory containing galaxy JSON files.",
236+
)
237+
parser.add_argument(
238+
"-o",
239+
"--output",
240+
type=Path,
241+
default=Path("misp-galaxies.graphml"),
242+
help="Output GraphML file path.",
243+
)
244+
parser.add_argument(
245+
"--no-existing-relationships",
246+
action="store_true",
247+
help="Disable explicit relationships from cluster related[] entries.",
248+
)
249+
parser.add_argument(
250+
"--cross-cluster-matching",
251+
choices=["none", "value", "value-or-synonyms"],
252+
default="none",
253+
help=(
254+
"Create inferred edges across different galaxy types by matching cluster values "
255+
"or values+synonyms."
256+
),
257+
)
258+
return parser.parse_args()
259+
260+
261+
def main() -> int:
262+
args = parse_args()
263+
264+
galaxies = load_galaxies(args.galaxies_dir)
265+
clusters, explicit_edges = load_clusters(args.clusters_dir)
266+
267+
graphml = build_graphml(
268+
galaxies=galaxies,
269+
clusters=clusters,
270+
explicit_edges=explicit_edges,
271+
include_explicit_edges=not args.no_existing_relationships,
272+
inferred_mode=args.cross_cluster_matching,
273+
)
274+
275+
args.output.parent.mkdir(parents=True, exist_ok=True)
276+
graphml.write(args.output, encoding="utf-8", xml_declaration=True)
277+
278+
print(
279+
f"GraphML written to {args.output} with {len(galaxies)} galaxies and {len(clusters)} clusters."
280+
)
281+
return 0
282+
283+
284+
if __name__ == "__main__":
285+
raise SystemExit(main())

0 commit comments

Comments
 (0)