Add YAML and Prolog rule engine implementations#137
Conversation
Two new rule engines alongside the existing Hy engine, selectable
via [rules] engine = yaml|prolog|hy in config.
YAML engine:
- yaml_condition.py: condition DSL compiler mapping 26 helpers by name
with comparison operators (eq/neq/gt/gte/lt/lte), boolean combinators
(all/any/not), and action constructors (set_status/annotate/no_op)
- yaml_engine.py: YamlRuleLoader (.yaml/.yml glob, hot-reload) +
YamlRuleEngine (same evaluate loop as Hy engine)
- 4 example YAML rules in rules/yaml-examples/
Prolog engine:
- prolog_engine.py: PrologRuleLoader + PrologRuleEngine using pyswip
(SWI-Prolog bridge). Asserts hypothesis facts into Prolog KB before
each rule evaluation, queries when/2 and then/2 clauses, retracts
after. Graceful skip when pyswip not installed.
- prolog_helpers.pl: 15 pre-loaded predicates mirroring Python helpers
(evidence, confidence, reliability ordering, temporal, AI ceiling)
- 3 example Prolog rules in rules/prolog-examples/
Factory updated: _SUPPORTED_ENGINES = {hy, yaml, prolog} with lazy
imports per engine type. pyproject.toml: pyyaml added to [rules],
new [rules-prolog] extras group. 31 new tests (20 YAML condition,
11 YAML engine), 154 total passing, 1 skipped (Prolog without pyswip).
https://claude.ai/code/session_01H5UbjsuiiGya5n1eUCxoaR
There was a problem hiding this comment.
Pull request overview
Adds two additional hypothesis rule-engine backends (YAML and Prolog) alongside the existing Hy engine, selectable via [rules] engine = hy|yaml|prolog, with example rules and unit tests.
Changes:
- Introduce YAML rule loader/engine plus a YAML condition/action compiler.
- Introduce Prolog rule loader/engine plus bundled Prolog helper predicates.
- Update engine factory/config example and extend optional dependency extras; add unit tests and example rule packs.
Reviewed changes
Copilot reviewed 17 out of 17 changed files in this pull request and generated 11 comments.
Show a summary per file
| File | Description |
|---|---|
gnat/analysis/rules/yaml_condition.py |
Compiles YAML condition DSL into helper predicates + action constructors. |
gnat/analysis/rules/yaml_engine.py |
YAML rule discovery/loading + evaluation loop mirroring the Hy engine. |
gnat/analysis/rules/prolog_engine.py |
Prolog rule discovery/loading + evaluation via pyswip and asserted hypothesis facts. |
gnat/analysis/rules/prolog_helpers.pl |
Preloaded Prolog predicates intended to mirror Python helper semantics. |
gnat/analysis/rules/factory.py |
Adds yaml/prolog engine selection with lazy imports. |
pyproject.toml |
Adds pyyaml to rules extra and introduces rules-prolog extra. |
config/config.ini.example |
Documents the 3 rule engines and new engine config option. |
tests/unit/analysis/rules/test_yaml_condition.py |
Unit tests for YAML condition/action compilation. |
tests/unit/analysis/rules/test_yaml_engine.py |
Loader/evaluation tests for the YAML engine. |
tests/unit/analysis/rules/test_prolog_engine.py |
Minimal Prolog engine tests (skipped without pyswip). |
rules/yaml-examples/*.yaml |
Example YAML rule set for common promotion/refutation/guard patterns. |
rules/prolog-examples/*.pl |
Example Prolog rule set for common promotion/refutation/guard patterns. |
Comments suppressed due to low confidence (1)
gnat/analysis/rules/factory.py:45
- create_engine() returns
Anyeven though the repo already definesRuleEngineProtocolfor structural typing. Returning the protocol (and importing it) preserves type safety for callers. Also, consider normalizing the config value with.strip().lower()(as done elsewhere in config factories) soengine = YAMLor whitespace doesn't break selection.
def create_engine(
config: Any,
policy: RuleEnginePolicy | None = None,
store: Any = None,
) -> Any:
"""
Create a rule engine from INI configuration.
Parameters
----------
config : configparser.ConfigParser
GNAT configuration.
policy : RuleEnginePolicy, optional
If not provided, built from ``config`` via ``RuleEnginePolicy.from_ini``.
store : WorkspaceStore, optional
For evidence resolution. Can be None if rules don't use source helpers.
"""
if policy is None:
policy = RuleEnginePolicy.from_ini(config)
engine_name = "hy"
if hasattr(config, "get") and hasattr(config, "has_section") and config.has_section("rules"):
engine_name = config.get("rules", "engine", fallback="hy")
if engine_name not in _SUPPORTED_ENGINES:
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| def compile_condition(spec: Any) -> Callable[[Any, Any], bool]: | ||
| if isinstance(spec, dict): | ||
| if "all" in spec: | ||
| subs = [compile_condition(s) for s in spec["all"]] | ||
| return lambda h, ctx: all(s(h, ctx) for s in subs) | ||
| if "any" in spec: | ||
| subs = [compile_condition(s) for s in spec["any"]] | ||
| return lambda h, ctx: any(s(h, ctx) for s in subs) | ||
| if "not" in spec: | ||
| inner = compile_condition(spec["not"]) | ||
| return lambda h, ctx: not inner(h, ctx) | ||
|
|
||
| for name, arg in spec.items(): | ||
| return _compile_leaf(name, arg) | ||
|
|
There was a problem hiding this comment.
compile_condition() silently ignores additional keys in a dict condition because it returns on the first for name, arg in spec.items() iteration. This can turn user mistakes into hard-to-debug partial conditions. Validate that leaf dicts have exactly one key (or explicitly support multi-key leaves via an implicit 'all'), and raise a ValueError otherwise.
| for op_name, threshold in arg.items(): | ||
| if op_name in _CMP_OPS: | ||
| cmp = _CMP_OPS[op_name] | ||
| return lambda h, ctx, _fn=fn, _cmp=cmp, _t=threshold: _cmp(_fn(h, ctx), _t) |
There was a problem hiding this comment.
_build_ctx_check(): when arg is a dict, unknown operator keys currently fall through to the default truthiness check instead of raising (unlike _build_no_ctx_check, which raises). This will silently accept typos in YAML like { gtte: 3 }. Raise a ValueError when no supported operator is found in the dict.
| return lambda h, ctx, _fn=fn, _cmp=cmp, _t=threshold: _cmp(_fn(h, ctx), _t) | |
| return lambda h, ctx, _fn=fn, _cmp=cmp, _t=threshold: _cmp(_fn(h, ctx), _t) | |
| raise ValueError(f"Unknown operator in {name}: {arg!r}") |
| support_ratio(R) :- supporting_count(S), evidence_count(T), T1 is T + 1, R is S / T1. | ||
|
|
||
| %% Confidence predicates | ||
| has_confidence :- stix_confidence(C), C > 0. |
There was a problem hiding this comment.
prolog_helpers.pl has_confidence/0 is defined as stix_confidence(C), C > 0, which does not mirror the Python helper has_confidence() (it checks whether a ConfidenceScore object exists, not whether STIX confidence is > 0). This can cause YAML/Hy vs Prolog engines to disagree for hypotheses with a confidence object but a 0 score. Consider asserting a separate confidence_present/0 fact from Python (or changing has_confidence to check for that) to match semantics.
| has_confidence :- stix_confidence(C), C > 0. | |
| has_confidence :- stix_confidence(_). |
| self._mtimes[yaml_file] = yaml_file.stat().st_mtime | ||
|
|
||
| except Exception as exc: # noqa: BLE001 | ||
| logger.error("Failed to load YAML rule file %s: %s", yaml_file, exc) |
There was a problem hiding this comment.
YamlRuleLoader._load_file() only records self._mtimes[yaml_file] after a successful parse. If YAML is invalid (or a rule spec raises during parsing), the mtime isn't tracked, so reload_if_changed() will treat the directory as changed forever and repeatedly re-attempt (and re-log) the same failure on each evaluation. To match RuleLoader behavior, update _mtimes even on failure so reload only happens when the file actually changes.
| self._mtimes[yaml_file] = yaml_file.stat().st_mtime | |
| except Exception as exc: # noqa: BLE001 | |
| logger.error("Failed to load YAML rule file %s: %s", yaml_file, exc) | |
| except Exception as exc: # noqa: BLE001 | |
| logger.error("Failed to load YAML rule file %s: %s", yaml_file, exc) | |
| finally: | |
| try: | |
| self._mtimes[yaml_file] = yaml_file.stat().st_mtime | |
| except OSError: | |
| self._mtimes.pop(yaml_file, None) |
| def load(self) -> list[RegisteredRule]: | ||
| if not _PYSWIP_AVAILABLE: | ||
| logger.warning( | ||
| "pyswip is not installed — Prolog rule loading skipped. " | ||
| "Install with: pip install 'gnat[rules-prolog]'" | ||
| ) | ||
| return [] | ||
|
|
||
| from pyswip import Prolog | ||
|
|
||
| self._prolog = Prolog() | ||
|
|
||
| if _HELPERS_PL.exists(): | ||
| self._prolog.consult(str(_HELPERS_PL)) | ||
|
|
||
| self._rules = [] | ||
| if not self._rules_dir.exists(): | ||
| logger.warning("Rules directory does not exist: %s", self._rules_dir) | ||
| return [] | ||
|
|
There was a problem hiding this comment.
PrologRuleLoader.load() resets self._rules but does not reset self._mtimes. If a .pl file is removed/renamed, reload_if_changed() will keep seeing current_files != tracked_files and reload on every evaluation. Clear self._mtimes at the start of load() so change detection can converge after a deletion.
| if rule.phase is not None and status_val != rule.phase: | ||
| continue | ||
|
|
||
| if not self._policy.allow_dirty_rules and rule.source_file and not git_file_is_clean(rule.source_file): |
There was a problem hiding this comment.
For consistency with AnalysisRuleEngine, consider logging a warning when skipping a rule due to allow_dirty_rules being false and the rule file being dirty. The Hy engine emits a warning including rule name and file path, which is useful for diagnosing why rules didn't fire.
| if not self._policy.allow_dirty_rules and rule.source_file and not git_file_is_clean(rule.source_file): | |
| if not self._policy.allow_dirty_rules and rule.source_file and not git_file_is_clean(rule.source_file): | |
| logger.warning( | |
| "Skipping YAML rule %s because allow_dirty_rules is false and rule file is dirty: %s", | |
| rule.name, | |
| str(rule.source_file), | |
| ) |
| if rule.phase is not None and status_val != rule.phase: | ||
| continue | ||
|
|
||
| if not self._policy.allow_dirty_rules and rule.source_file and not git_file_is_clean(rule.source_file): |
There was a problem hiding this comment.
For consistency with AnalysisRuleEngine, consider logging a warning when skipping a rule due to allow_dirty_rules being false and the rule file being dirty. The Hy engine emits a warning including rule name and file path, which is useful for diagnosing why rules didn't fire.
| if not self._policy.allow_dirty_rules and rule.source_file and not git_file_is_clean(rule.source_file): | |
| if not self._policy.allow_dirty_rules and rule.source_file and not git_file_is_clean(rule.source_file): | |
| logger.warning( | |
| "Skipping Prolog rule %s because rule file is dirty and allow_dirty_rules is false: %s", | |
| rule.name, | |
| rule.source_file, | |
| ) |
|
|
||
| results = list(self._prolog.query("rule(Name, Attrs)")) | ||
| for result in results: | ||
| name = str(result["Name"]) | ||
| attrs = result["Attrs"] | ||
| self._register_rule(name, attrs, pl_file) |
There was a problem hiding this comment.
PrologRuleLoader._load_file() consults each .pl file and then queries rule(Name, Attrs) without restricting to rules defined in that file. Since the Prolog KB accumulates across consults, every subsequent file will re-return previously loaded rule/2 facts and re-register them, producing duplicate RegisteredRule entries. Consider consulting all files first and querying once, or tracking seen rule names and only registering newly discovered rules.
| results = list(self._prolog.query("rule(Name, Attrs)")) | |
| for result in results: | |
| name = str(result["Name"]) | |
| attrs = result["Attrs"] | |
| self._register_rule(name, attrs, pl_file) | |
| existing_rule_names = {rule.name for rule in self._rules} | |
| results = list(self._prolog.query("rule(Name, Attrs)")) | |
| for result in results: | |
| name = str(result["Name"]) | |
| if name in existing_rule_names: | |
| continue | |
| attrs = result["Attrs"] | |
| self._register_rule(name, attrs, pl_file) | |
| existing_rule_names.add(name) |
| def load(self) -> list[RegisteredRule]: | ||
| if not _YAML_AVAILABLE: | ||
| logger.warning( | ||
| "pyyaml is not installed — YAML rule loading skipped. " | ||
| "Install with: pip install pyyaml" | ||
| ) | ||
| return [] | ||
|
|
||
| self._rules = [] | ||
| if not self._rules_dir.exists(): | ||
| logger.warning("Rules directory does not exist: %s", self._rules_dir) | ||
| return [] | ||
|
|
There was a problem hiding this comment.
YamlRuleLoader.load() resets self._rules but does not reset self._mtimes. If a rules file is removed/renamed, reload_if_changed() will keep seeing current_files != tracked_files and reload on every evaluation. Clear self._mtimes (and any other tracking state) at the start of load() so the tracked set reflects the current directory contents.
| def _load_file(self, pl_file: Path) -> None: | ||
| try: | ||
| self._prolog.consult(str(pl_file)) | ||
| self._mtimes[pl_file] = pl_file.stat().st_mtime | ||
|
|
||
| results = list(self._prolog.query("rule(Name, Attrs)")) | ||
| for result in results: | ||
| name = str(result["Name"]) | ||
| attrs = result["Attrs"] | ||
| self._register_rule(name, attrs, pl_file) | ||
|
|
||
| except Exception as exc: # noqa: BLE001 | ||
| logger.error("Failed to load Prolog rule file %s: %s", pl_file, exc) | ||
|
|
There was a problem hiding this comment.
PrologRuleLoader._load_file() only records self._mtimes[pl_file] after a successful consult+query. If a .pl file has a syntax error (consult fails), the mtime isn't tracked, so reload_if_changed() will treat the directory as changed forever and retry (and re-log) on every evaluation. Consider recording the mtime even on failure so reload only happens when the file is edited.
…s, and DCO Agent-Logs-Url: https://github.com/wrhalpin/GNAT/sessions/8da72773-f3d4-403a-9c49-8098904d8254 Co-authored-by: wrhalpin <[email protected]>
Two new rule engines alongside the existing Hy engine, selectable via [rules] engine = yaml|prolog|hy in config.
YAML engine:
Prolog engine:
Factory updated: _SUPPORTED_ENGINES = {hy, yaml, prolog} with lazy imports per engine type. pyproject.toml: pyyaml added to [rules], new [rules-prolog] extras group. 31 new tests (20 YAML condition, 11 YAML engine), 154 total passing, 1 skipped (Prolog without pyswip).
https://claude.ai/code/session_01H5UbjsuiiGya5n1eUCxoaR