Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
251 changes: 204 additions & 47 deletions tools/threat_actor_similarity_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,18 @@
import argparse
import itertools
import json
import math
import re
import zlib
from collections import Counter
from datetime import datetime, timezone
from difflib import SequenceMatcher
from pathlib import Path


ALGORITHMS = ("sequence", "levenshtein", "compression", "vector")


def normalize_name(value: str) -> str:
"""Normalize actor/alias names for comparison."""
value = value.lower().strip()
Expand Down Expand Up @@ -56,62 +62,160 @@ def load_threat_actor_names(cluster_path: Path):
return names_to_actors


def find_similar_name_pairs(names_to_actors, min_similarity=0.88, max_results=200):
"""Compute similar name pairs using difflib.SequenceMatcher.

Numeric-heavy variants (same non-numeric stem, different numeric tokens) are
intentionally skipped to reduce noisy matches like "apt 28" vs "apt 29".
"""
names = sorted(names_to_actors)
results = []
def should_compare_pair(left: str, right: str, min_similarity: float) -> bool:
"""Fast filters to skip noisy/clearly impossible candidates."""
if left == right:
return False

left_non_numeric = strip_numeric_tokens(left)
right_non_numeric = strip_numeric_tokens(right)
left_numbers = numeric_tokens(left)
right_numbers = numeric_tokens(right)

# Threat-actor names often reuse a textual stem with a different numeric id.
# Treat these as distinct identifiers to avoid over-reporting false positives.
if (
left_numbers
and right_numbers
and left_non_numeric
and right_non_numeric
and left_non_numeric == right_non_numeric
and left_numbers != right_numbers
):
return False

# A cheap pre-filter: similarity cannot pass threshold if string lengths differ too much.
max_len_ratio_delta = (1.0 - min_similarity) / max(min_similarity, 1e-9)
longer = max(len(left), len(right))
shorter = min(len(left), len(right))
if shorter == 0:
return False
if (longer - shorter) / shorter > max_len_ratio_delta:
return False

return True


def sequence_similarity(left: str, right: str) -> float:
"""difflib.SequenceMatcher ratio."""
matcher = SequenceMatcher(None, left, right)
return matcher.ratio()


def levenshtein_similarity(left: str, right: str) -> float:
"""Normalized Levenshtein similarity: 1 - distance / max_len."""
if left == right:
return 1.0
if not left or not right:
return 0.0

if len(left) < len(right):
left, right = right, left

previous_row = list(range(len(right) + 1))
for i, left_ch in enumerate(left, start=1):
current_row = [i]
for j, right_ch in enumerate(right, start=1):
insertions = previous_row[j] + 1
deletions = current_row[j - 1] + 1
substitutions = previous_row[j - 1] + (left_ch != right_ch)
current_row.append(min(insertions, deletions, substitutions))
previous_row = current_row

distance = previous_row[-1]
return 1.0 - (distance / max(len(left), len(right)))


def compression_similarity(left: str, right: str) -> float:
"""Compression-based similarity derived from Normalized Compression Distance."""

def clen(text: str) -> int:
return len(zlib.compress(text.encode("utf-8")))

c_left = clen(left)
c_right = clen(right)
c_joined = clen(f"{left}|{right}")
ncd = (c_joined - min(c_left, c_right)) / max(c_left, c_right)
return max(0.0, min(1.0, 1.0 - ncd))


def char_ngram_counter(text: str, n: int = 3) -> Counter:
"""Character n-gram bag for vector comparison."""
padded = f" {text} "
if len(padded) < n:
return Counter({padded: 1})
return Counter(padded[i : i + n] for i in range(len(padded) - n + 1))


def vector_similarity(left: str, right: str) -> float:
"""Cosine similarity over character n-gram count vectors."""
vec_left = char_ngram_counter(left)
vec_right = char_ngram_counter(right)

common_keys = set(vec_left) & set(vec_right)
numerator = sum(vec_left[k] * vec_right[k] for k in common_keys)
if numerator == 0:
return 0.0

norm_left = math.sqrt(sum(v * v for v in vec_left.values()))
norm_right = math.sqrt(sum(v * v for v in vec_right.values()))
if not norm_left or not norm_right:
return 0.0

return numerator / (norm_left * norm_right)


def get_similarity(algorithm: str, left: str, right: str) -> float:
"""Dispatch algorithm scorer."""
if algorithm == "sequence":
return sequence_similarity(left, right)
if algorithm == "levenshtein":
return levenshtein_similarity(left, right)
if algorithm == "compression":
return compression_similarity(left, right)
if algorithm == "vector":
return vector_similarity(left, right)
raise ValueError(f"Unknown algorithm: {algorithm}")


def find_similar_name_pairs(
names_to_actors,
algorithms,
min_similarity=0.88,
max_results=200,
combine_mode="union",
):
"""Compute similar name pairs using one or more similarity algorithms."""
names = sorted(names_to_actors)
results = []

for left, right in itertools.combinations(names, 2):
# Skip identical forms and aliases of exactly the same actor only.
if left == right:
continue

left_actors = names_to_actors[left]
right_actors = names_to_actors[right]
if left_actors == right_actors:
continue

left_non_numeric = strip_numeric_tokens(left)
right_non_numeric = strip_numeric_tokens(right)
left_numbers = numeric_tokens(left)
right_numbers = numeric_tokens(right)

# Threat-actor names often reuse a textual stem with a different numeric id.
# Treat these as distinct identifiers to avoid over-reporting false positives.
if (
left_numbers
and right_numbers
and left_non_numeric
and right_non_numeric
and left_non_numeric == right_non_numeric
and left_numbers != right_numbers
):
if not should_compare_pair(left, right, min_similarity):
continue

longer = max(len(left), len(right))
shorter = min(len(left), len(right))
if shorter == 0:
continue
if (longer - shorter) / shorter > max_len_ratio_delta:
continue
algorithm_scores = {}
for algorithm in algorithms:
score = get_similarity(algorithm, left, right)
if score >= min_similarity:
algorithm_scores[algorithm] = round(score, 4)

matcher = SequenceMatcher(None, left, right)
if matcher.quick_ratio() < min_similarity:
continue
score = matcher.ratio()
if score < min_similarity:
if combine_mode == "intersection":
include = len(algorithm_scores) == len(algorithms)
else:
include = bool(algorithm_scores)

if not include:
continue

aggregate_score = sum(algorithm_scores.values()) / len(algorithm_scores)
results.append(
{
"score": round(score, 4),
"score": round(aggregate_score, 4),
"algorithm_scores": algorithm_scores,
"name_1": left,
"actors_1": sorted(left_actors.items()),
"name_2": right,
Expand All @@ -123,9 +227,17 @@ def find_similar_name_pairs(names_to_actors, min_similarity=0.88, max_results=20
return results[:max_results]


def build_markdown_report(results, source_path: Path, min_similarity: float, max_results: int):
def build_markdown_report(
results,
source_path: Path,
min_similarity: float,
max_results: int,
algorithms,
combine_mode,
):
"""Build a markdown report containing potential similar threat-actor names."""
generated_at = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")

def format_actors(actors):
return ", ".join(f"`{name}` ({uuid or 'N/A'})" for name, uuid in actors)

Expand All @@ -134,27 +246,30 @@ def format_actors(actors):
"",
f"- Generated: {generated_at}",
f"- Source cluster: `{source_path}`",
f"- Similarity method: `difflib.SequenceMatcher.ratio()`",
f"- Similarity algorithms: `{', '.join(algorithms)}`",
f"- Combine mode: `{combine_mode}`",
f"- Threshold: `{min_similarity}`",
f"- Max results: `{max_results}`",
f"- Matches returned: `{len(results)}`",
"",
]

if not results:
lines.append("No potential similar names found with current threshold.")
lines.append("No potential similar names found with current threshold/settings.")
return "\n".join(lines) + "\n"

lines.extend(
[
"| score | name_1 | actor(s)_1 | name_2 | actor(s)_2 |",
"|---:|---|---|---|---|",
"| score | algorithm_scores | name_1 | actor(s)_1 | name_2 | actor(s)_2 |",
"|---:|---|---|---|---|---|",
]
)
for item in results:
score_parts = ", ".join(f"{k}:{v:.4f}" for k, v in sorted(item["algorithm_scores"].items()))
lines.append(
"| {score:.4f} | `{name_1}` | {actors_1} | `{name_2}` | {actors_2} |".format(
"| {score:.4f} | `{scores}` | `{name_1}` | {actors_1} | `{name_2}` | {actors_2} |".format(
score=item["score"],
scores=score_parts,
name_1=item["name_1"],
actors_1=format_actors(item["actors_1"]),
name_2=item["name_2"],
Expand All @@ -165,11 +280,28 @@ def format_actors(actors):
return "\n".join(lines) + "\n"


def parse_algorithms(value: str):
"""Parse comma-separated algorithms; special keyword: all."""
raw = [part.strip().lower() for part in value.split(",") if part.strip()]
if not raw:
raise ValueError("--algorithms cannot be empty")
if "all" in raw:
return list(ALGORITHMS)

invalid = [name for name in raw if name not in ALGORITHMS]
if invalid:
allowed = ", ".join(ALGORITHMS) + ", all"
raise ValueError(f"Unknown algorithm(s): {', '.join(invalid)}. Allowed: {allowed}")

# Preserve user order while removing duplicates.
return list(dict.fromkeys(raw))


def main():
parser = argparse.ArgumentParser(
description=(
"Find potential similar threat-actor names and aliases in a MISP galaxy "
"cluster using SequenceMatcher."
"cluster using configurable similarity algorithms."
)
)
parser.add_argument(
Expand All @@ -189,6 +321,23 @@ def main():
default=200,
help="Maximum number of similar pairs to report (default: 200)",
)
parser.add_argument(
"--algorithms",
default="sequence",
help=(
"Comma-separated algorithms to use: sequence, levenshtein, compression, "
"vector, or all (default: sequence)"
),
)
parser.add_argument(
"--combine-mode",
choices=("union", "intersection"),
default="union",
help=(
"How to combine multi-algorithm results: union (any algorithm passes) or "
"intersection (all selected algorithms must pass). Default: union"
),
)
parser.add_argument(
"--markdown-output",
default="threat_actor_similarity_report.md",
Expand All @@ -204,19 +353,25 @@ def main():
if not 0.0 <= args.threshold <= 1.0:
raise ValueError("--threshold must be in [0.0, 1.0]")

algorithms = parse_algorithms(args.algorithms)

cluster_path = Path(args.cluster)
names_to_actors = load_threat_actor_names(cluster_path)
results = find_similar_name_pairs(
names_to_actors,
algorithms=algorithms,
min_similarity=args.threshold,
max_results=args.max_results,
combine_mode=args.combine_mode,
)

markdown = build_markdown_report(
results,
source_path=cluster_path,
min_similarity=args.threshold,
max_results=args.max_results,
algorithms=algorithms,
combine_mode=args.combine_mode,
)
output_path = Path(args.markdown_output)
output_path.write_text(markdown, encoding="utf-8")
Expand All @@ -227,6 +382,8 @@ def main():

print(f"Analyzed normalized names: {len(names_to_actors)}")
print(f"Potential similar name pairs: {len(results)}")
print(f"Algorithms: {', '.join(algorithms)}")
print(f"Combine mode: {args.combine_mode}")
print(f"Markdown report written to: {output_path}")
if args.json_output:
print(f"JSON report written to: {args.json_output}")
Expand Down
Loading