fix: Filter controls by test_procedure content — eliminates governance false positives
Only use controls whose test_procedure mentions document-type-specific terms: - DSI: test_procedure must contain 'datenschutzerkl' or 'art. 13/14' - Cookie: must contain 'cookie', 'einwilligung', 'consent' - Impressum: must contain 'impressum' This filters out internal governance controls (Datenmodelle, Infrastruktur) that are irrelevant for public document checks. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -30,24 +30,28 @@ OLLAMA_MODEL = os.getenv("OLLAMA_MODEL", "qwen3.5:35b-a3b")
|
|||||||
DOC_TYPE_FILTERS = {
|
DOC_TYPE_FILTERS = {
|
||||||
"dse": {
|
"dse": {
|
||||||
"category": "data_protection",
|
"category": "data_protection",
|
||||||
"keywords": ["informationspflicht", "datenschutzerkl", "art. 13", "art. 14",
|
"keywords": ["informationspflicht"],
|
||||||
"betroffenenrecht", "verantwortlich", "datenschutzbeauftrag"],
|
"test_proc_must_contain": ["datenschutzerkl", "informationspflicht", "art. 13", "art. 14"],
|
||||||
},
|
},
|
||||||
"cookie": {
|
"cookie": {
|
||||||
"category": "data_protection",
|
"category": "data_protection",
|
||||||
"keywords": ["cookie", "einwilligung", "tracking", "consent"],
|
"keywords": ["cookie", "einwilligung"],
|
||||||
|
"test_proc_must_contain": ["cookie", "einwilligung", "consent", "banner"],
|
||||||
},
|
},
|
||||||
"impressum": {
|
"impressum": {
|
||||||
"category": "compliance",
|
"category": "compliance",
|
||||||
"keywords": ["impressum", "anbieterkennzeichnung", "telemedien", "tmg"],
|
"keywords": ["impressum", "anbieterkennzeichnung"],
|
||||||
|
"test_proc_must_contain": ["impressum", "anbieterkennzeichnung"],
|
||||||
},
|
},
|
||||||
"widerruf": {
|
"widerruf": {
|
||||||
"category": "compliance",
|
"category": "compliance",
|
||||||
"keywords": ["widerruf", "verbraucher", "fernabsatz"],
|
"keywords": ["widerruf", "verbraucher"],
|
||||||
|
"test_proc_must_contain": ["widerruf", "fernabsatz"],
|
||||||
},
|
},
|
||||||
"agb": {
|
"agb": {
|
||||||
"category": "compliance",
|
"category": "compliance",
|
||||||
"keywords": ["geschäftsbedingung", "agb", "vertragsklausel"],
|
"keywords": ["geschäftsbedingung", "agb"],
|
||||||
|
"test_proc_must_contain": ["geschäftsbedingung", "agb", "vertragsbedingung"],
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -68,7 +72,8 @@ async def check_document_with_controls(
|
|||||||
keywords = filters.get("keywords", [])
|
keywords = filters.get("keywords", [])
|
||||||
|
|
||||||
# Query relevant controls from DB
|
# Query relevant controls from DB
|
||||||
controls = _query_controls(db_session, category, keywords, max_controls)
|
test_proc_kw = filters.get("test_proc_must_contain")
|
||||||
|
controls = _query_controls(db_session, category, keywords, max_controls, test_proc_kw)
|
||||||
if not controls:
|
if not controls:
|
||||||
logger.info("No canonical controls found for '%s' (%s)", doc_title, doc_type)
|
logger.info("No canonical controls found for '%s' (%s)", doc_title, doc_type)
|
||||||
return []
|
return []
|
||||||
@@ -85,13 +90,23 @@ async def check_document_with_controls(
|
|||||||
return results
|
return results
|
||||||
|
|
||||||
|
|
||||||
def _query_controls(db_session, category: str, keywords: list[str], limit: int) -> list[dict]:
|
def _query_controls(db_session, category: str, keywords: list[str], limit: int,
|
||||||
"""Query canonical_controls by category + title keywords."""
|
test_proc_keywords: list[str] | None = None) -> list[dict]:
|
||||||
|
"""Query canonical_controls by category + title + test_procedure keywords."""
|
||||||
from sqlalchemy import text
|
from sqlalchemy import text
|
||||||
|
|
||||||
# Build keyword filter
|
# Build keyword filter for title
|
||||||
keyword_clauses = " OR ".join([f"title ILIKE :kw{i}" for i in range(len(keywords))])
|
keyword_clauses = " OR ".join([f"title ILIKE :kw{i}" for i in range(len(keywords))])
|
||||||
params = {f"kw{i}": f"%{kw}%" for i, kw in enumerate(keywords)}
|
params = {f"kw{i}": f"%{kw}%" for i, kw in enumerate(keywords)}
|
||||||
|
|
||||||
|
# Build test_procedure filter (ensures controls are relevant to document type)
|
||||||
|
proc_filter = ""
|
||||||
|
if test_proc_keywords:
|
||||||
|
proc_clauses = " OR ".join([f"test_procedure::text ILIKE :tp{i}" for i in range(len(test_proc_keywords))])
|
||||||
|
for i, tp in enumerate(test_proc_keywords):
|
||||||
|
params[f"tp{i}"] = f"%{tp}%"
|
||||||
|
proc_filter = f"AND ({proc_clauses})"
|
||||||
|
|
||||||
params["cat"] = category
|
params["cat"] = category
|
||||||
params["limit"] = limit
|
params["limit"] = limit
|
||||||
|
|
||||||
@@ -101,6 +116,8 @@ def _query_controls(db_session, category: str, keywords: list[str], limit: int)
|
|||||||
WHERE category = :cat
|
WHERE category = :cat
|
||||||
AND release_state != 'deleted'
|
AND release_state != 'deleted'
|
||||||
AND ({keyword_clauses})
|
AND ({keyword_clauses})
|
||||||
|
{proc_filter}
|
||||||
|
AND test_procedure::text != '[]'
|
||||||
ORDER BY risk_score DESC NULLS LAST
|
ORDER BY risk_score DESC NULLS LAST
|
||||||
LIMIT :limit
|
LIMIT :limit
|
||||||
""")
|
""")
|
||||||
|
|||||||
Reference in New Issue
Block a user