fix(cra): Scanner-Findings vollstaendig mappen + assess-from-scanner-Latenz senken
CI / detect-changes (push) Successful in 17s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / secret-scan (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / build-sha-integrity (push) Successful in 13s
CI / validate-canonical-controls (push) Successful in 12s
CI / loc-budget (push) Successful in 25s
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / nodejs-build (push) Has been skipped
CI / test-go (push) Has been skipped
CI / iace-gt-coverage (push) Has been skipped
CI / test-python-backend (push) Successful in 30s
CI / test-python-document-crawler (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped

Punkt 2 (Coverage): semgrep/gdpr-Findings ohne CWE blieben unmapped (~21%).
Der Mapper nutzt jetzt den scanner rule_id + gezielte Keywords (gdpr ->
Datenminimierung CRA-AI-17, path-traversal/prototype-pollution -> CRA-AI-20,
nginx-header/Docker-Hardening -> CRA-AI-1/4, insecure-websocket -> CRA-AI-15).
Reale Scanner-Daten: unmapped 19/92 -> 0/92 (Coverage 100%).

Punkt 3 (Latenz): enrich_findings_with_breadth lief ~6 Aggregat-Queries je
(use_case,sub_topic)-Paar, nutzte aber nur die Liste. Jetzt EINE batched Query
(breadth_controls_batch) fuer alle Paare + Prozess-Cache (TTL 1800s). macmini:
cold 0,23s / warm 0,000s. Prod-Root-Cause: atom_classification ohne
(use_case,sub_topic)-Index nach DB-Swap -> Index dem DB-Owner empfohlen.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Benjamin Bönisch
2026-06-17 13:17:51 +02:00
parent 4f4ffc2ad5
commit 72093e5501
5 changed files with 181 additions and 33 deletions
@@ -10,6 +10,8 @@ is breadth + source evidence, not a replacement.
Only network_security is atom-grain — we query only that, always scoped by
sub_topic + limit (per the caveats).
"""
import time
from compliance.api.cra_annex_i_data import ANNEX_I_REQUIREMENTS
from compliance.services.use_case_controls import UseCaseControlsService
@@ -53,37 +55,54 @@ def usecases_for(sub_topic: str) -> list:
return ["cra", _TECHNICAL_USECASE.get(sub_topic, "network_security")]
# Process-level memo for the (use_case, sub_topic) breadth lists. The atom corpus
# is static reference data, so it is safe to reuse across requests — this turns the
# warm path into zero DB work; only the first call after a (re)start pays for it.
_BREADTH_CACHE: dict = {} # (use_case, sub_topic) -> (monotonic_ts, [controls])
_BREADTH_TTL = 1800.0
def enrich_findings_with_breadth(mapped: list, db, per_use_case: int = 3) -> None:
"""Attach `sub_topic` + `regulatory_breadth` (atom controls from the CRA corpus
+ the technical-depth corpus) to each finding, each control tagged with its
use_case. Queries are cached per (use_case, sub_topic). Best-effort: on any
error a finding just gets fewer/empty breadth — never breaks the assessment.
use_case. The needed (use_case, sub_topic) pairs are fetched in ONE batched
query (process-cached); the old path ran ~6 queries per pair (latency #61).
Best-effort: on any error a finding just gets empty breadth — never breaks the
assessment.
"""
svc = UseCaseControlsService(db)
cache: dict = {}
now = time.monotonic()
needed: set = set()
for m in mapped:
st = _REQ_TO_SUBTOPIC.get(m.get("primary_requirement"))
m["sub_topic"] = st
if not st:
m["regulatory_breadth"] = []
continue
merged, seen = [], set()
for uc in usecases_for(st):
key = (uc, st)
if key not in cache:
try:
res = svc.controls_for_use_case(uc, sub_topic=st, limit=per_use_case)
cache[key] = [
{"control_id": c.get("control_id"), "title": c.get("title"),
"source_regulation": c.get("source_regulation"),
"source_article": c.get("source_article"),
"severity": c.get("severity"), "use_case": uc}
for c in res.get("controls", [])
]
except Exception:
cache[key] = []
for c in cache[key]:
if c["control_id"] and c["control_id"] not in seen:
seen.add(c["control_id"])
hit = _BREADTH_CACHE.get(key)
if not hit or now - hit[0] >= _BREADTH_TTL:
needed.add(key)
if needed:
try:
fetched = UseCaseControlsService(db).breadth_controls_batch(
needed, per=per_use_case)
except Exception:
fetched = {}
for key in needed: # cache hits AND empty results
_BREADTH_CACHE[key] = (now, fetched.get(key, []))
for m in mapped:
st = m.get("sub_topic")
if not st:
continue
merged, seen = [], set()
for uc in usecases_for(st):
cached = _BREADTH_CACHE.get((uc, st))
for c in (cached[1] if cached else []):
cid = c.get("control_id")
if cid and cid not in seen:
seen.add(cid)
merged.append(c)
m["regulatory_breadth"] = merged