diff --git a/backend-compliance/compliance/services/cra_finding_mapper.py b/backend-compliance/compliance/services/cra_finding_mapper.py index 7a40c866..01a64c58 100644 --- a/backend-compliance/compliance/services/cra_finding_mapper.py +++ b/backend-compliance/compliance/services/cra_finding_mapper.py @@ -53,7 +53,10 @@ _CWE_TO_REQ = { 79: ["CRA-AI-20"], 89: ["CRA-AI-20"], 77: ["CRA-AI-20"], 78: ["CRA-AI-20"], 22: ["CRA-AI-20"], 20: ["CRA-AI-20"], } -# Substring fallback (lowercase) against category + title + description, primary first. +# Substring fallback (lowercase) against category + scan_type + rule_id + title + +# description, primary first. Includes the scanner rule_id (e.g. the semgrep +# `express-path-join-resolve` / `header-redefinition` rules) — semgrep findings +# often carry NO cwe, but the rule_id pins the vulnerability class precisely. _KEYWORD_TO_REQ = [ ("default password", "CRA-AI-8"), ("hardcoded", "CRA-AI-9"), ("secret", "CRA-AI-9"), ("credential", "CRA-AI-9"), ("password", "CRA-AI-8"), ("mfa", "CRA-AI-7"), @@ -64,8 +67,23 @@ _KEYWORD_TO_REQ = [ ("rate limit", "CRA-AI-11"), ("sbom", "CRA-AI-23"), ("dependency", "CRA-AI-22"), ("outdated", "CRA-AI-22"), ("known vuln", "CRA-AI-22"), ("cve", "CRA-AI-22"), ("injection", "CRA-AI-20"), ("xss", "CRA-AI-20"), ("sql", "CRA-AI-20"), ("traversal", "CRA-AI-20"), + # path traversal / prototype pollution (semgrep, usually no cwe) + ("path-join", "CRA-AI-20"), ("outside of the destination", "CRA-AI-20"), + ("prototype pollut", "CRA-AI-20"), ("prototype-pollution", "CRA-AI-20"), + ("object.assign", "CRA-AI-20"), ("path traversal", "CRA-AI-20"), + # insecure HTTP/security-header config (nginx add_header redefinition) + ("add_header", "CRA-AI-1"), ("header-redefinition", "CRA-AI-1"), ("header redefinition", "CRA-AI-1"), + # insecure transport (unencrypted WebSocket -> use wss) + ("websocket", "CRA-AI-15"), ("insecure ws", "CRA-AI-15"), + # container hardening: writable root fs / missing USER (image least-privilege + config) + ("writable-filesys", "CRA-AI-1"), ("writable filesystem", "CRA-AI-1"), ("read-only filesystem", "CRA-AI-1"), + ("missing-user", "CRA-AI-4"), ("not specifying a user", "CRA-AI-4"), ("logging", "CRA-AI-24"), ("update", "CRA-AI-28"), ("signature", "CRA-AI-29"), ("integrity", "CRA-AI-6"), ("debug", "CRA-AI-1"), ("config", "CRA-AI-1"), + # data protection (gdpr-pattern scanner) -> CRA Annex I data minimisation + ("gdpr", "CRA-AI-17"), ("consent", "CRA-AI-17"), ("personal data", "CRA-AI-17"), + ("data collection", "CRA-AI-17"), ("data deletion", "CRA-AI-17"), + ("data minim", "CRA-AI-17"), ("data retention", "CRA-AI-17"), ] @@ -76,6 +94,7 @@ class ScannerFinding: title: str = "" description: str = "" category: str = "" + rule_id: str = "" # scanner rule id (e.g. semgrep rule) — class signal cwe: str = "" severity: str = "" # critical | high | medium | low (scanner's rating) cvss: Optional[float] = None @@ -98,6 +117,7 @@ class ScannerFinding: title=d.get("title", "") or d.get("name", ""), description=d.get("description", "") or d.get("detail", ""), category=d.get("category", "") or d.get("type", "") or d.get("scan_type", "") or d.get("scanner", ""), + rule_id=str(d.get("rule_id", "") or d.get("check_id", "") or ""), cwe=str(d.get("cwe", "") or ""), severity=sev, cvss=d.get("cvss") if d.get("cvss") is not None else d.get("cvss_score"), @@ -178,7 +198,7 @@ def _candidate_reqs(f: ScannerFinding) -> list: num = _cwe_num(f.cwe) if num in _CWE_TO_REQ: out.extend(_CWE_TO_REQ[num]) - haystack = " ".join([f.category, f.title, f.description]).lower() + haystack = " ".join([f.category, f.rule_id, f.title, f.description]).lower() for kw, rid in _KEYWORD_TO_REQ: if kw in haystack: out.append(rid) diff --git a/backend-compliance/compliance/services/cra_use_case_controls.py b/backend-compliance/compliance/services/cra_use_case_controls.py index 6bfcde22..34d2bd38 100644 --- a/backend-compliance/compliance/services/cra_use_case_controls.py +++ b/backend-compliance/compliance/services/cra_use_case_controls.py @@ -10,6 +10,8 @@ is breadth + source evidence, not a replacement. Only network_security is atom-grain — we query only that, always scoped by sub_topic + limit (per the caveats). """ +import time + from compliance.api.cra_annex_i_data import ANNEX_I_REQUIREMENTS from compliance.services.use_case_controls import UseCaseControlsService @@ -53,37 +55,54 @@ def usecases_for(sub_topic: str) -> list: return ["cra", _TECHNICAL_USECASE.get(sub_topic, "network_security")] +# Process-level memo for the (use_case, sub_topic) breadth lists. The atom corpus +# is static reference data, so it is safe to reuse across requests — this turns the +# warm path into zero DB work; only the first call after a (re)start pays for it. +_BREADTH_CACHE: dict = {} # (use_case, sub_topic) -> (monotonic_ts, [controls]) +_BREADTH_TTL = 1800.0 + + def enrich_findings_with_breadth(mapped: list, db, per_use_case: int = 3) -> None: """Attach `sub_topic` + `regulatory_breadth` (atom controls from the CRA corpus + the technical-depth corpus) to each finding, each control tagged with its - use_case. Queries are cached per (use_case, sub_topic). Best-effort: on any - error a finding just gets fewer/empty breadth — never breaks the assessment. + use_case. The needed (use_case, sub_topic) pairs are fetched in ONE batched + query (process-cached); the old path ran ~6 queries per pair (latency #61). + Best-effort: on any error a finding just gets empty breadth — never breaks the + assessment. """ - svc = UseCaseControlsService(db) - cache: dict = {} + now = time.monotonic() + needed: set = set() for m in mapped: st = _REQ_TO_SUBTOPIC.get(m.get("primary_requirement")) m["sub_topic"] = st if not st: m["regulatory_breadth"] = [] continue - merged, seen = [], set() for uc in usecases_for(st): key = (uc, st) - if key not in cache: - try: - res = svc.controls_for_use_case(uc, sub_topic=st, limit=per_use_case) - cache[key] = [ - {"control_id": c.get("control_id"), "title": c.get("title"), - "source_regulation": c.get("source_regulation"), - "source_article": c.get("source_article"), - "severity": c.get("severity"), "use_case": uc} - for c in res.get("controls", []) - ] - except Exception: - cache[key] = [] - for c in cache[key]: - if c["control_id"] and c["control_id"] not in seen: - seen.add(c["control_id"]) + hit = _BREADTH_CACHE.get(key) + if not hit or now - hit[0] >= _BREADTH_TTL: + needed.add(key) + + if needed: + try: + fetched = UseCaseControlsService(db).breadth_controls_batch( + needed, per=per_use_case) + except Exception: + fetched = {} + for key in needed: # cache hits AND empty results + _BREADTH_CACHE[key] = (now, fetched.get(key, [])) + + for m in mapped: + st = m.get("sub_topic") + if not st: + continue + merged, seen = [], set() + for uc in usecases_for(st): + cached = _BREADTH_CACHE.get((uc, st)) + for c in (cached[1] if cached else []): + cid = c.get("control_id") + if cid and cid not in seen: + seen.add(cid) merged.append(c) m["regulatory_breadth"] = merged diff --git a/backend-compliance/compliance/services/use_case_controls.py b/backend-compliance/compliance/services/use_case_controls.py index c758686b..2fcd8f1a 100644 --- a/backend-compliance/compliance/services/use_case_controls.py +++ b/backend-compliance/compliance/services/use_case_controls.py @@ -14,7 +14,7 @@ from __future__ import annotations from typing import Any, Optional -from sqlalchemy import text +from sqlalchemy import bindparam, text from sqlalchemy.orm import Session from compliance.data.use_case_registry import REGISTRY, is_valid_use_case @@ -148,12 +148,82 @@ _ATOM_COUNT_SQL = text(""" """) +# Breadth fast-path: top-N atom controls for MANY (use_case, sub_topic) pairs in +# ONE query. The CRA enrichment only needs this list — NOT the counts/facets/total +# that controls_for_use_case also computes (those are 5 extra aggregate scans per +# call, discarded by the caller). On prod (atom_classification currently lacks the +# (use_case, sub_topic) index after the DB swap) collapsing ~6 queries × N pairs +# into one scan is the difference between ~38s and a few seconds. +_ATOM_BREADTH_BATCH_SQL = text(""" + SELECT q.use_case, q.sub_topic, q.control_id, q.title, q.severity, + q.source_regulation, q.source_article + FROM ( + SELECT ac.use_case, ac.sub_topic, cc.control_id, cc.title, cc.severity, + cpl.source_regulation, cpl.source_article, + row_number() OVER ( + PARTITION BY ac.use_case, ac.sub_topic + ORDER BY CASE cc.severity WHEN 'critical' THEN 0 WHEN 'high' THEN 1 + WHEN 'medium' THEN 2 ELSE 3 END, cc.title + ) AS rn + FROM atom_classification ac + JOIN canonical_controls cc ON cc.id = ac.control_uuid + LEFT JOIN LATERAL ( + SELECT cpl.source_regulation, cpl.source_article + FROM control_parent_links cpl + WHERE cpl.control_uuid = ac.control_uuid LIMIT 1 + ) cpl ON true + WHERE ac.relevant = true + AND (ac.addressee IS NULL OR ac.addressee NOT IN + ('aufsichtsbefugnis','staat_eu','dritter','meta')) + AND (ac.use_case, ac.sub_topic) IN :pairs + ) q + WHERE q.rn <= :per +""").bindparams(bindparam("pairs", expanding=True)) + +# Process-level memo: does the atom table exist? (never changes at runtime) +_ATOM_TABLE_EXISTS: dict[str, Optional[bool]] = {"v": None} + + class UseCaseControlsService: """Topic → controls retrieval over the seeded use-case mappings.""" def __init__(self, db: Session) -> None: self.db = db + def _atom_table_exists(self) -> bool: + if _ATOM_TABLE_EXISTS["v"] is None: + _ATOM_TABLE_EXISTS["v"] = self.db.execute( + text("SELECT to_regclass('compliance.atom_classification')") + ).scalar() is not None + return bool(_ATOM_TABLE_EXISTS["v"]) + + def breadth_controls_batch( + self, pairs, per: int = 3, + ) -> dict[tuple[str, str], list[dict[str, Any]]]: + """Top-``per`` atom controls for each (use_case, sub_topic) pair, in ONE + query. Returns {(use_case, sub_topic): [control dicts]}. Best-effort: + empty dict on any error or when the atom table is absent (caller then + leaves breadth empty — never breaks the assessment).""" + uniq = sorted({(uc, st) for uc, st in pairs if uc and st}) + if not uniq or not self._atom_table_exists(): + return {} + try: + rows = self.db.execute( + _ATOM_BREADTH_BATCH_SQL, + {"pairs": uniq, "per": min(max(int(per), 1), 50)}, + ).fetchall() + except Exception: + return {} + out: dict[tuple[str, str], list[dict[str, Any]]] = {} + for r in rows: + out.setdefault((r.use_case, r.sub_topic), []).append({ + "control_id": r.control_id, "title": r.title, + "source_regulation": r.source_regulation, + "source_article": r.source_article, + "severity": r.severity, "use_case": r.use_case, + }) + return out + def list_use_cases(self) -> list[dict[str, Any]]: """Registry use-cases with live counts — atom-grain (Haiku classification) plus the legacy master seed. Backs the coverage overview so every topic is diff --git a/backend-compliance/tests/test_cra_finding_mapper.py b/backend-compliance/tests/test_cra_finding_mapper.py index 7ea64772..889004e0 100644 --- a/backend-compliance/tests/test_cra_finding_mapper.py +++ b/backend-compliance/tests/test_cra_finding_mapper.py @@ -56,6 +56,39 @@ def test_unmapped_finding_is_flagged_not_invented(): assert m.requirement_ids == [] +def test_gdpr_scanner_finding_maps_to_data_minimisation(): + # gdpr-pattern scanner finding (no cwe) -> CRA Annex I data minimisation + m = map_finding(ScannerFinding( + id="g1", category="gdpr", title="Data collection without apparent consent mechanism", + severity="medium")) + assert m.primary_requirement == "CRA-AI-17" + assert not m.unmapped + + +def test_semgrep_path_traversal_via_rule_id_maps_to_secure_testing(): + # semgrep finding with NO cwe — the rule_id pins it as path traversal + m = map_finding(ScannerFinding( + id="s1", category="sast", rule_id="javascript.express.security.express-path-join-resolve", + title="Possible writing outside of the destination", severity="medium")) + assert m.primary_requirement == "CRA-AI-20" + assert not m.unmapped + + +def test_semgrep_prototype_pollution_maps_to_secure_testing(): + m = map_finding(ScannerFinding( + id="s2", category="sast", title="Possibility of prototype polluting function detected", + severity="medium")) + assert m.primary_requirement == "CRA-AI-20" + + +def test_nginx_header_redefinition_maps_to_secure_config(): + m = map_finding(ScannerFinding( + id="s3", category="sast", rule_id="generic.nginx.security.header-redefinition", + title="The 'add_header' directive is called in a 'location' block", severity="medium")) + assert m.primary_requirement == "CRA-AI-1" + assert not m.unmapped + + def test_assessment_aggregates_and_coverage(): findings = [ ScannerFinding(id="a", cwe="CWE-259", severity="critical"), # CRA-AI-8 diff --git a/backend-compliance/tests/test_cra_use_case_controls.py b/backend-compliance/tests/test_cra_use_case_controls.py index 984fc5dc..a164c8e3 100644 --- a/backend-compliance/tests/test_cra_use_case_controls.py +++ b/backend-compliance/tests/test_cra_use_case_controls.py @@ -21,23 +21,29 @@ def test_every_requirement_maps_to_a_valid_subtopic(): class _FakeControlsService: - """Stands in for UseCaseControlsService: returns one atom control per call, - carrying the legal anchor (source_article) the real atom query now selects.""" + """Stands in for UseCaseControlsService: returns one atom control per + (use_case, sub_topic) pair, carrying the legal anchor (source_article) the + real batched atom query now selects.""" def __init__(self, db): pass - def controls_for_use_case(self, use_case, sub_topic=None, limit=3): - return {"controls": [{ - "control_id": "AI-{}-{}".format(use_case, sub_topic), - "title": "Test obligation", - "source_regulation": "Cyber Resilience Act (CRA)", - "source_article": "Artikel 13", - "severity": "high", - }]} + def breadth_controls_batch(self, pairs, per=3): + return { + (uc, st): [{ + "control_id": "AI-{}-{}".format(uc, st), + "title": "Test obligation", + "source_regulation": "Cyber Resilience Act (CRA)", + "source_article": "Artikel 13", + "severity": "high", + "use_case": uc, + }] + for uc, st in pairs + } def test_breadth_carries_source_article(monkeypatch): + cra_use_case_controls._BREADTH_CACHE.clear() # process cache — isolate the test monkeypatch.setattr( cra_use_case_controls, "UseCaseControlsService", _FakeControlsService, )