Add vulnerabilities capabilities to the mcp server#3732
Add vulnerabilities capabilities to the mcp server#3732jledoze wants to merge 4 commits intointuitem:mainfrom
Conversation
📝 WalkthroughWalkthroughAdds vulnerability management: a resolver for vulnerability identifiers and new MCP tools to list, get, create, update, and delete vulnerabilities; extends asset creation and update tools to accept security objectives and disaster recovery fields. Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant MCP_Server
participant Resolver
participant ExternalAPI
User->>MCP_Server: get_vulnerability(identifier)
MCP_Server->>Resolver: resolve_vulnerability_id(identifier)
Resolver->>ExternalAPI: GET /vulnerabilities/?name=identifier
ExternalAPI-->>Resolver: 200 + results / non-200
alt one match
Resolver-->>MCP_Server: vulnerability_id
MCP_Server->>ExternalAPI: GET /vulnerabilities/{id}/
ExternalAPI-->>MCP_Server: 200 + vulnerability detail
MCP_Server-->>User: success_response (Markdown)
else no results or ambiguous
Resolver-->>MCP_Server: raise ValueError (no/ambiguous)
MCP_Server-->>User: error_response
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (1)
cli/ca_mcp/tools/write_tools.py (1)
1100-1110: Consider passingfolder_idto resolver functions for scoped lookups.The
resolve_applied_control_idandresolve_asset_idcalls don't passfolder_id, unlikecreate_risk_scenariowhich scopes resolution by folder. In deployments with same-named controls/assets across folders, this could cause ambiguous resolution errors.♻️ Suggested change for scoped resolution
+ resolved_folder = resolve_folder_id(folder_id) if folder_id else None + if applied_controls: resolved_controls = [] for control in applied_controls: - resolved_controls.append(resolve_applied_control_id(control)) + resolved_controls.append(resolve_applied_control_id(control, folder_id=resolved_folder)) payload["applied_controls"] = resolved_controls if assets: resolved_assets = [] for asset in assets: - resolved_assets.append(resolve_asset_id(asset)) + resolved_assets.append(resolve_asset_id(asset, folder_id=resolved_folder)) payload["assets"] = resolved_assets🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cli/ca_mcp/tools/write_tools.py` around lines 1100 - 1110, The applied_controls and assets resolution is not scoped by folder_id; update the calls to resolve_applied_control_id and resolve_asset_id to pass the folder_id used by create_risk_scenario so lookups are folder-scoped (e.g., change resolve_applied_control_id(control) -> resolve_applied_control_id(control, folder_id) and resolve_asset_id(asset) -> resolve_asset_id(asset, folder_id)); ensure the surrounding function accepts or has access to folder_id and adjust any callers accordingly so ambiguity across same-named controls/assets is avoided.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@cli/ca_mcp/tools/write_tools.py`:
- Around line 1100-1110: The applied_controls and assets resolution is not
scoped by folder_id; update the calls to resolve_applied_control_id and
resolve_asset_id to pass the folder_id used by create_risk_scenario so lookups
are folder-scoped (e.g., change resolve_applied_control_id(control) ->
resolve_applied_control_id(control, folder_id) and resolve_asset_id(asset) ->
resolve_asset_id(asset, folder_id)); ensure the surrounding function accepts or
has access to folder_id and adjust any callers accordingly so ambiguity across
same-named controls/assets is avoided.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: f5a141c1-8e4e-4e1f-9c2d-a4900be83341
📒 Files selected for processing (5)
cli/ca_mcp/resolvers.pycli/ca_mcp/server.pycli/ca_mcp/tools/read_tools.pycli/ca_mcp/tools/update_tools.pycli/ca_mcp/tools/write_tools.py
|
Thank you @jledoze ! |
a946f5b to
81346c9
Compare
|
The resolver have been fixed. |
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@cli/ca_mcp/tools/read_tools.py`:
- Around line 1306-1338: The summary currently uses len(vulnerabilities) and
undercounts paginated results; update the logic in the get_vulnerabilities path
to use the total count from the API response (data.get("count") if present) and
include a "showing first N" note when count > len(vulnerabilities). Use the
existing variables/data flow (data, vulnerabilities from get_paginated_results)
and change the result header construction (the result variable assembly) to
display the total count and the number shown (e.g., "Found {total}
vulnerabilities (showing first {shown})") while preserving the existing filters
and table output.
In `@cli/ca_mcp/tools/update_tools.py`:
- Around line 1159-1168: The payload construction currently resolves folder_id
then calls resolve_asset_id() and resolve_applied_control_id() without scoping;
update the calls so the resolved folder UUID from resolve_folder_id(folder_id)
is passed into resolve_asset_id(...) and resolve_applied_control_id(...).
Specifically, after assigning payload["folder"] = resolve_folder_id(folder_id),
store that returned UUID (e.g., resolved_folder) and use it as the
folder_id/namespace argument when building resolved_controls =
[resolve_applied_control_id(c, resolved_folder) for c in applied_controls] and
resolved_assets = [resolve_asset_id(a, resolved_folder) for a in assets] so
lookups are scoped to the resolved folder.
- Around line 156-193: When needs_objectives is true, the prefetch via
make_get_request -> fetch_res must be validated and the function must fail fast
if fetch_res.status_code != 200 instead of proceeding with current_asset = {}
(which causes silent overwrite of CIA/DRO to zeros); update the block that
builds current_asset/cur_sec/cur_dro so that if fetch_res.status_code is not 200
you either raise/return an error or skip modifying objectives (i.e., do not set
payload["security_objectives"] or payload["disaster_recovery_objectives"]), and
ensure any downstream code that calls this path (symbols: make_get_request,
current_asset, cur_sec, cur_dro, _merge_cia, payload) handles the early
failure/skip accordingly.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 91e3946a-0949-44b8-8d34-6db0a6c80843
📒 Files selected for processing (5)
cli/ca_mcp/resolvers.pycli/ca_mcp/server.pycli/ca_mcp/tools/read_tools.pycli/ca_mcp/tools/update_tools.pycli/ca_mcp/tools/write_tools.py
✅ Files skipped from review due to trivial changes (1)
- cli/ca_mcp/resolvers.py
🚧 Files skipped from review as they are similar to previous changes (2)
- cli/ca_mcp/server.py
- cli/ca_mcp/tools/write_tools.py
| data = res.json() | ||
| vulnerabilities = get_paginated_results(data) | ||
|
|
||
| if not vulnerabilities: | ||
| return empty_response("vulnerabilities", filters) | ||
|
|
||
| result = f"Found {len(vulnerabilities)} vulnerabilities" | ||
| if filters: | ||
| result += f" ({', '.join(f'{k}={v}' for k, v in filters.items())})" | ||
| result += "\n\n" | ||
| result += "|ID|Name|Ref ID|Status|Severity|Folder|\n" | ||
| result += "|---|---|---|---|---|---|\n" | ||
|
|
||
| for vuln in vulnerabilities: | ||
| vuln_id = vuln.get("id", "N/A") | ||
| name = vuln.get("name", "N/A") | ||
| ref_id = vuln.get("ref_id") or "-" | ||
| vuln_status = STATUS_LABELS.get(vuln.get("status", "--"), vuln.get("status", "--")) | ||
| sev_val = vuln.get("severity", -1) | ||
| vuln_severity = SEVERITY_LABELS.get(sev_val, str(sev_val)) | ||
| vuln_folder = (vuln.get("folder") or {}) | ||
| if isinstance(vuln_folder, dict): | ||
| vuln_folder = vuln_folder.get("str", vuln_folder.get("name", "-")) | ||
| else: | ||
| vuln_folder = str(vuln_folder) if vuln_folder else "-" | ||
|
|
||
| result += f"|{vuln_id}|{name}|{ref_id}|{vuln_status}|{vuln_severity}|{vuln_folder}|\n" | ||
|
|
||
| return success_response( | ||
| result, | ||
| "get_vulnerabilities", | ||
| "Use get_vulnerability with a specific ID to retrieve full details, or create_vulnerability to add a new one", | ||
| ) |
There was a problem hiding this comment.
Surface truncated vulnerability lists.
This summary is built from len(vulnerabilities) only. On a paginated /vulnerabilities/ response, the tool undercounts matches and makes the first page look exhaustive. get_audits_progress() already handles this with data["count"] plus a "showing first N" note; this path should do the same.
Suggested minimal fix
data = res.json()
vulnerabilities = get_paginated_results(data)
@@
- result = f"Found {len(vulnerabilities)} vulnerabilities"
+ total_count = (
+ data.get("count", len(vulnerabilities))
+ if isinstance(data, dict)
+ else len(vulnerabilities)
+ )
+ result = f"Found {total_count} vulnerabilities"
+ if total_count > len(vulnerabilities):
+ result += (
+ f" (showing first {len(vulnerabilities)}, use filters to narrow down)"
+ )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@cli/ca_mcp/tools/read_tools.py` around lines 1306 - 1338, The summary
currently uses len(vulnerabilities) and undercounts paginated results; update
the logic in the get_vulnerabilities path to use the total count from the API
response (data.get("count") if present) and include a "showing first N" note
when count > len(vulnerabilities). Use the existing variables/data flow (data,
vulnerabilities from get_paginated_results) and change the result header
construction (the result variable assembly) to display the total count and the
number shown (e.g., "Found {total} vulnerabilities (showing first {shown})")
while preserving the existing filters and table output.
| if needs_objectives: | ||
| fetch_res = make_get_request(f"/assets/{resolved_asset_id}/") | ||
| current_asset = fetch_res.json() if fetch_res.status_code == 200 else {} | ||
|
|
||
| raw_sec = current_asset.get("security_objectives") or {} | ||
| cur_sec = raw_sec.get("objectives", {}) if isinstance(raw_sec, dict) else {} | ||
|
|
||
| raw_dro = current_asset.get("disaster_recovery_objectives") or {} | ||
| cur_dro = raw_dro.get("objectives", {}) if isinstance(raw_dro, dict) else {} | ||
|
|
||
| if any(p is not None for p in [ | ||
| sec_confidentiality, sec_confidentiality_enabled, | ||
| sec_integrity, sec_integrity_enabled, | ||
| sec_availability, sec_availability_enabled, | ||
| ]): | ||
| def _merge_cia(key, new_val, new_enabled): | ||
| existing = cur_sec.get(key) or {"value": 0, "is_enabled": False} | ||
| return { | ||
| "value": new_val if new_val is not None else existing.get("value", 0), | ||
| "is_enabled": new_enabled if new_enabled is not None else existing.get("is_enabled", False), | ||
| } | ||
| payload["security_objectives"] = { | ||
| "objectives": { | ||
| "confidentiality": _merge_cia("confidentiality", sec_confidentiality, sec_confidentiality_enabled), | ||
| "integrity": _merge_cia("integrity", sec_integrity, sec_integrity_enabled), | ||
| "availability": _merge_cia("availability", sec_availability, sec_availability_enabled), | ||
| } | ||
| } | ||
|
|
||
| if any(p is not None for p in [dro_rto, dro_rpo, dro_mtd]): | ||
| payload["disaster_recovery_objectives"] = { | ||
| "objectives": { | ||
| "rto": {"value": dro_rto if dro_rto is not None else (cur_dro.get("rto") or {}).get("value", 0)}, | ||
| "rpo": {"value": dro_rpo if dro_rpo is not None else (cur_dro.get("rpo") or {}).get("value", 0)}, | ||
| "mtd": {"value": dro_mtd if dro_mtd is not None else (cur_dro.get("mtd") or {}).get("value", 0)}, | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
Fail fast when the objective prefetch misses.
If /assets/{id}/ returns non-200 here, current_asset becomes {} and the merge code writes 0/False back for untouched CIA/DRO fields. That turns a transient read failure into silent data loss on the subsequent PATCH.
Suggested minimal fix
if needs_objectives:
fetch_res = make_get_request(f"/assets/{resolved_asset_id}/")
- current_asset = fetch_res.json() if fetch_res.status_code == 200 else {}
+ if fetch_res.status_code != 200:
+ return (
+ "Error fetching current asset for objective merge: "
+ f"{fetch_res.status_code} - {fetch_res.text}"
+ )
+ current_asset = fetch_res.json()📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if needs_objectives: | |
| fetch_res = make_get_request(f"/assets/{resolved_asset_id}/") | |
| current_asset = fetch_res.json() if fetch_res.status_code == 200 else {} | |
| raw_sec = current_asset.get("security_objectives") or {} | |
| cur_sec = raw_sec.get("objectives", {}) if isinstance(raw_sec, dict) else {} | |
| raw_dro = current_asset.get("disaster_recovery_objectives") or {} | |
| cur_dro = raw_dro.get("objectives", {}) if isinstance(raw_dro, dict) else {} | |
| if any(p is not None for p in [ | |
| sec_confidentiality, sec_confidentiality_enabled, | |
| sec_integrity, sec_integrity_enabled, | |
| sec_availability, sec_availability_enabled, | |
| ]): | |
| def _merge_cia(key, new_val, new_enabled): | |
| existing = cur_sec.get(key) or {"value": 0, "is_enabled": False} | |
| return { | |
| "value": new_val if new_val is not None else existing.get("value", 0), | |
| "is_enabled": new_enabled if new_enabled is not None else existing.get("is_enabled", False), | |
| } | |
| payload["security_objectives"] = { | |
| "objectives": { | |
| "confidentiality": _merge_cia("confidentiality", sec_confidentiality, sec_confidentiality_enabled), | |
| "integrity": _merge_cia("integrity", sec_integrity, sec_integrity_enabled), | |
| "availability": _merge_cia("availability", sec_availability, sec_availability_enabled), | |
| } | |
| } | |
| if any(p is not None for p in [dro_rto, dro_rpo, dro_mtd]): | |
| payload["disaster_recovery_objectives"] = { | |
| "objectives": { | |
| "rto": {"value": dro_rto if dro_rto is not None else (cur_dro.get("rto") or {}).get("value", 0)}, | |
| "rpo": {"value": dro_rpo if dro_rpo is not None else (cur_dro.get("rpo") or {}).get("value", 0)}, | |
| "mtd": {"value": dro_mtd if dro_mtd is not None else (cur_dro.get("mtd") or {}).get("value", 0)}, | |
| } | |
| } | |
| if needs_objectives: | |
| fetch_res = make_get_request(f"/assets/{resolved_asset_id}/") | |
| if fetch_res.status_code != 200: | |
| return ( | |
| "Error fetching current asset for objective merge: " | |
| f"{fetch_res.status_code} - {fetch_res.text}" | |
| ) | |
| current_asset = fetch_res.json() | |
| raw_sec = current_asset.get("security_objectives") or {} | |
| cur_sec = raw_sec.get("objectives", {}) if isinstance(raw_sec, dict) else {} | |
| raw_dro = current_asset.get("disaster_recovery_objectives") or {} | |
| cur_dro = raw_dro.get("objectives", {}) if isinstance(raw_dro, dict) else {} | |
| if any(p is not None for p in [ | |
| sec_confidentiality, sec_confidentiality_enabled, | |
| sec_integrity, sec_integrity_enabled, | |
| sec_availability, sec_availability_enabled, | |
| ]): | |
| def _merge_cia(key, new_val, new_enabled): | |
| existing = cur_sec.get(key) or {"value": 0, "is_enabled": False} | |
| return { | |
| "value": new_val if new_val is not None else existing.get("value", 0), | |
| "is_enabled": new_enabled if new_enabled is not None else existing.get("is_enabled", False), | |
| } | |
| payload["security_objectives"] = { | |
| "objectives": { | |
| "confidentiality": _merge_cia("confidentiality", sec_confidentiality, sec_confidentiality_enabled), | |
| "integrity": _merge_cia("integrity", sec_integrity, sec_integrity_enabled), | |
| "availability": _merge_cia("availability", sec_availability, sec_availability_enabled), | |
| } | |
| } | |
| if any(p is not None for p in [dro_rto, dro_rpo, dro_mtd]): | |
| payload["disaster_recovery_objectives"] = { | |
| "objectives": { | |
| "rto": {"value": dro_rto if dro_rto is not None else (cur_dro.get("rto") or {}).get("value", 0)}, | |
| "rpo": {"value": dro_rpo if dro_rpo is not None else (cur_dro.get("rpo") or {}).get("value", 0)}, | |
| "mtd": {"value": dro_mtd if dro_mtd is not None else (cur_dro.get("mtd") or {}).get("value", 0)}, | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@cli/ca_mcp/tools/update_tools.py` around lines 156 - 193, When
needs_objectives is true, the prefetch via make_get_request -> fetch_res must be
validated and the function must fail fast if fetch_res.status_code != 200
instead of proceeding with current_asset = {} (which causes silent overwrite of
CIA/DRO to zeros); update the block that builds current_asset/cur_sec/cur_dro so
that if fetch_res.status_code is not 200 you either raise/return an error or
skip modifying objectives (i.e., do not set payload["security_objectives"] or
payload["disaster_recovery_objectives"]), and ensure any downstream code that
calls this path (symbols: make_get_request, current_asset, cur_sec, cur_dro,
_merge_cia, payload) handles the early failure/skip accordingly.
| if folder_id is not None: | ||
| payload["folder"] = resolve_folder_id(folder_id) | ||
| if filtering_labels is not None: | ||
| payload["filtering_labels"] = filtering_labels | ||
| if applied_controls is not None: | ||
| resolved_controls = [resolve_applied_control_id(c) for c in applied_controls] | ||
| payload["applied_controls"] = resolved_controls | ||
| if assets is not None: | ||
| resolved_assets = [resolve_asset_id(a) for a in assets] | ||
| payload["assets"] = resolved_assets |
There was a problem hiding this comment.
Scope asset/control names to the resolved folder.
resolve_asset_id() and resolve_applied_control_id() already accept an optional folder_id specifically to avoid ambiguous matches. After resolving folder_id, pass that UUID into these lookups so same-named assets or controls in other folders cannot be linked accidentally.
Suggested minimal fix
- if folder_id is not None:
- payload["folder"] = resolve_folder_id(folder_id)
+ resolved_folder_id = None
+ if folder_id is not None:
+ resolved_folder_id = resolve_folder_id(folder_id)
+ payload["folder"] = resolved_folder_id
@@
- resolved_controls = [resolve_applied_control_id(c) for c in applied_controls]
+ resolved_controls = [
+ resolve_applied_control_id(c, resolved_folder_id)
+ for c in applied_controls
+ ]
payload["applied_controls"] = resolved_controls
if assets is not None:
- resolved_assets = [resolve_asset_id(a) for a in assets]
+ resolved_assets = [
+ resolve_asset_id(a, resolved_folder_id)
+ for a in assets
+ ]
payload["assets"] = resolved_assets🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@cli/ca_mcp/tools/update_tools.py` around lines 1159 - 1168, The payload
construction currently resolves folder_id then calls resolve_asset_id() and
resolve_applied_control_id() without scoping; update the calls so the resolved
folder UUID from resolve_folder_id(folder_id) is passed into
resolve_asset_id(...) and resolve_applied_control_id(...). Specifically, after
assigning payload["folder"] = resolve_folder_id(folder_id), store that returned
UUID (e.g., resolved_folder) and use it as the folder_id/namespace argument when
building resolved_controls = [resolve_applied_control_id(c, resolved_folder) for
c in applied_controls] and resolved_assets = [resolve_asset_id(a,
resolved_folder) for a in assets] so lookups are scoped to the resolved folder.
Add Vulnerability Management & Expand Asset/Risk Scenario MCP Tools
Summary
This PR adds full CRUD support for vulnerabilities in the MCP server and extends
the existing asset and risk scenario tools to cover previously missing API fields,
including security objectives and disaster recovery objectives.
New: Vulnerability tools
Five new MCP tools covering the full
/api/vulnerabilities/REST API surface:get_vulnerabilitiesfolder,status,severity,searchget_vulnerabilitycreate_vulnerabilityupdate_vulnerabilitydelete_vulnerabilityAll tools support name-based resolution (UUID or human name) via a new
resolve_vulnerability_idresolver added toresolvers.py.Supported fields for create/update:
name,description,ref_id,status,severity,folder,filtering_labels,applied_controls,assets,security_exceptionsExtended:
update_risk_scenarioAdded
vulnerabilitiesparameter — a list of vulnerability IDs/names exploitedby the scenario, resolved to UUIDs before sending to the API.
Extended:
update_assetThe existing tool only covered 5 fields. It now covers the full
PATCH /api/assets/{id}/schema:ref_id,observation,reference_link,is_published,is_business_functiondora_licenced_activity,dora_criticality_assessment,dora_criticality_justification,dora_discontinuing_impactapplied_controls,vulnerabilitiessupport_assets,security_exceptions,filtering_labels,owner,folder_idsec_confidentiality,sec_integrity,sec_availability+_enabledflags per objective)dro_rto,dro_rpo,dro_mtdin seconds)Extended:
create_assetAdded security and disaster recovery objective parameters matching
update_asset.On create,
is_enableddefaults totrueautomatically when a value is provided.Objectives format sent to the API:
{ "security_objectives": { "objectives": { "confidentiality": { "value": 4, "is_enabled": true }, "integrity": { "value": 4, "is_enabled": true }, "availability": { "value": 4, "is_enabled": true } } }, "disaster_recovery_objectives": { "objectives": { "rto": { "value": 1860 }, "rpo": { "value": 14400 }, "mtd": { "value": 0 } } } } <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **New Features** * Full vulnerability management: create, list, view details, update, and delete vulnerabilities via new endpoints. * Assets can include security objectives (confidentiality/integrity/availability) and disaster-recovery targets when creating or updating. * Risk scenarios can be linked to vulnerabilities. * **Bug Fixes / Improvements** * Improved vulnerability lookup with clearer ambiguity and error messages when resolution fails. <!-- end of auto-generated comment: release notes by coderabbit.ai -->