feat(canonical-controls): Canonical Control Library — rechtssichere Security Controls
All checks were successful
CI/CD / go-lint (push) Has been skipped
CI/CD / python-lint (push) Has been skipped
CI/CD / nodejs-lint (push) Has been skipped
CI/CD / test-go-ai-compliance (push) Successful in 40s
CI/CD / test-python-backend-compliance (push) Successful in 41s
CI/CD / test-python-document-crawler (push) Successful in 26s
CI/CD / test-python-dsms-gateway (push) Successful in 23s
CI/CD / validate-canonical-controls (push) Successful in 18s
CI/CD / deploy-hetzner (push) Successful in 2m26s
All checks were successful
CI/CD / go-lint (push) Has been skipped
CI/CD / python-lint (push) Has been skipped
CI/CD / nodejs-lint (push) Has been skipped
CI/CD / test-go-ai-compliance (push) Successful in 40s
CI/CD / test-python-backend-compliance (push) Successful in 41s
CI/CD / test-python-document-crawler (push) Successful in 26s
CI/CD / test-python-dsms-gateway (push) Successful in 23s
CI/CD / validate-canonical-controls (push) Successful in 18s
CI/CD / deploy-hetzner (push) Successful in 2m26s
Eigenstaendig formulierte Security Controls mit unabhaengiger Taxonomie und Open-Source-Verankerung (OWASP, NIST, ENISA). Keine BSI-Nomenklatur. - Migration 044: 5 DB-Tabellen (frameworks, controls, sources, licenses, mappings) - 10 Seed Controls mit 39 Open-Source-Referenzen - License Gate: Quellen-Berechtigungspruefung (analysis/excerpt/embeddings/product) - Too-Close-Detektor: 5 Metriken (exact-phrase, token-overlap, ngram, embedding, LCS) - REST API: 8 Endpoints unter /v1/canonical/ - Go Loader mit Multi-Index (ID, domain, severity, framework) - Frontend: Control Library Browser + Provenance Wiki - CI/CD: validate-controls.py Job (schema, no-leak, open-anchors) - 67 Tests (8 Go + 59 Python), alle PASS - MkDocs Dokumentation Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -33,6 +33,7 @@ from .change_request_routes import router as change_request_router
|
||||
from .generation_routes import router as generation_router
|
||||
from .project_routes import router as project_router
|
||||
from .wiki_routes import router as wiki_router
|
||||
from .canonical_control_routes import router as canonical_control_router
|
||||
|
||||
# Include sub-routers
|
||||
router.include_router(audit_router)
|
||||
@@ -67,6 +68,7 @@ router.include_router(change_request_router)
|
||||
router.include_router(generation_router)
|
||||
router.include_router(project_router)
|
||||
router.include_router(wiki_router)
|
||||
router.include_router(canonical_control_router)
|
||||
|
||||
__all__ = [
|
||||
"router",
|
||||
@@ -101,4 +103,5 @@ __all__ = [
|
||||
"generation_router",
|
||||
"project_router",
|
||||
"wiki_router",
|
||||
"canonical_control_router",
|
||||
]
|
||||
|
||||
332
backend-compliance/compliance/api/canonical_control_routes.py
Normal file
332
backend-compliance/compliance/api/canonical_control_routes.py
Normal file
@@ -0,0 +1,332 @@
|
||||
"""
|
||||
FastAPI routes for the Canonical Control Library.
|
||||
|
||||
Provides read-only access to independently authored security controls.
|
||||
All controls are formulated without proprietary nomenclature and anchored
|
||||
in open-source frameworks (OWASP, NIST, ENISA).
|
||||
|
||||
Endpoints:
|
||||
GET /v1/canonical/frameworks — All frameworks
|
||||
GET /v1/canonical/frameworks/{framework_id} — Framework details
|
||||
GET /v1/canonical/frameworks/{framework_id}/controls — Controls of a framework
|
||||
GET /v1/canonical/controls — All controls (filterable)
|
||||
GET /v1/canonical/controls/{control_id} — Single control by control_id
|
||||
GET /v1/canonical/sources — Source registry
|
||||
GET /v1/canonical/licenses — License matrix
|
||||
POST /v1/canonical/controls/{control_id}/similarity-check — Too-close check
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from typing import Any, Optional
|
||||
|
||||
from fastapi import APIRouter, HTTPException, Query
|
||||
from pydantic import BaseModel
|
||||
from sqlalchemy import text
|
||||
|
||||
from database import SessionLocal
|
||||
from compliance.services.license_gate import get_license_matrix, get_source_permissions
|
||||
from compliance.services.similarity_detector import check_similarity
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
router = APIRouter(prefix="/v1/canonical", tags=["canonical-controls"])
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# RESPONSE MODELS
|
||||
# =============================================================================
|
||||
|
||||
class FrameworkResponse(BaseModel):
|
||||
id: str
|
||||
framework_id: str
|
||||
name: str
|
||||
version: str
|
||||
description: Optional[str] = None
|
||||
owner: Optional[str] = None
|
||||
policy_version: Optional[str] = None
|
||||
release_state: str
|
||||
created_at: str
|
||||
updated_at: str
|
||||
|
||||
|
||||
class ControlResponse(BaseModel):
|
||||
id: str
|
||||
framework_id: str
|
||||
control_id: str
|
||||
title: str
|
||||
objective: str
|
||||
rationale: str
|
||||
scope: dict
|
||||
requirements: list
|
||||
test_procedure: list
|
||||
evidence: list
|
||||
severity: str
|
||||
risk_score: Optional[float] = None
|
||||
implementation_effort: Optional[str] = None
|
||||
evidence_confidence: Optional[float] = None
|
||||
open_anchors: list
|
||||
release_state: str
|
||||
tags: list
|
||||
created_at: str
|
||||
updated_at: str
|
||||
|
||||
|
||||
class SimilarityCheckRequest(BaseModel):
|
||||
source_text: str
|
||||
candidate_text: str
|
||||
|
||||
|
||||
class SimilarityCheckResponse(BaseModel):
|
||||
max_exact_run: int
|
||||
token_overlap: float
|
||||
ngram_jaccard: float
|
||||
embedding_cosine: float
|
||||
lcs_ratio: float
|
||||
status: str
|
||||
details: dict
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# HELPERS
|
||||
# =============================================================================
|
||||
|
||||
def _row_to_dict(row, columns: list[str]) -> dict[str, Any]:
|
||||
"""Generic row → dict converter."""
|
||||
return {col: (getattr(row, col).isoformat() if hasattr(getattr(row, col, None), 'isoformat') else getattr(row, col)) for col in columns}
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# FRAMEWORKS
|
||||
# =============================================================================
|
||||
|
||||
@router.get("/frameworks")
|
||||
async def list_frameworks():
|
||||
"""List all registered control frameworks."""
|
||||
with SessionLocal() as db:
|
||||
rows = db.execute(
|
||||
text("""
|
||||
SELECT id, framework_id, name, version, description,
|
||||
owner, policy_version, release_state,
|
||||
created_at, updated_at
|
||||
FROM canonical_control_frameworks
|
||||
ORDER BY name
|
||||
""")
|
||||
).fetchall()
|
||||
|
||||
return [
|
||||
{
|
||||
"id": str(r.id),
|
||||
"framework_id": r.framework_id,
|
||||
"name": r.name,
|
||||
"version": r.version,
|
||||
"description": r.description,
|
||||
"owner": r.owner,
|
||||
"policy_version": r.policy_version,
|
||||
"release_state": r.release_state,
|
||||
"created_at": r.created_at.isoformat() if r.created_at else None,
|
||||
"updated_at": r.updated_at.isoformat() if r.updated_at else None,
|
||||
}
|
||||
for r in rows
|
||||
]
|
||||
|
||||
|
||||
@router.get("/frameworks/{framework_id}")
|
||||
async def get_framework(framework_id: str):
|
||||
"""Get a single framework by its framework_id."""
|
||||
with SessionLocal() as db:
|
||||
row = db.execute(
|
||||
text("""
|
||||
SELECT id, framework_id, name, version, description,
|
||||
owner, policy_version, release_state,
|
||||
created_at, updated_at
|
||||
FROM canonical_control_frameworks
|
||||
WHERE framework_id = :fid
|
||||
"""),
|
||||
{"fid": framework_id},
|
||||
).fetchone()
|
||||
|
||||
if not row:
|
||||
raise HTTPException(status_code=404, detail="Framework not found")
|
||||
|
||||
return {
|
||||
"id": str(row.id),
|
||||
"framework_id": row.framework_id,
|
||||
"name": row.name,
|
||||
"version": row.version,
|
||||
"description": row.description,
|
||||
"owner": row.owner,
|
||||
"policy_version": row.policy_version,
|
||||
"release_state": row.release_state,
|
||||
"created_at": row.created_at.isoformat() if row.created_at else None,
|
||||
"updated_at": row.updated_at.isoformat() if row.updated_at else None,
|
||||
}
|
||||
|
||||
|
||||
@router.get("/frameworks/{framework_id}/controls")
|
||||
async def list_framework_controls(
|
||||
framework_id: str,
|
||||
severity: Optional[str] = Query(None),
|
||||
release_state: Optional[str] = Query(None),
|
||||
):
|
||||
"""List controls belonging to a framework."""
|
||||
with SessionLocal() as db:
|
||||
# Resolve framework UUID
|
||||
fw = db.execute(
|
||||
text("SELECT id FROM canonical_control_frameworks WHERE framework_id = :fid"),
|
||||
{"fid": framework_id},
|
||||
).fetchone()
|
||||
if not fw:
|
||||
raise HTTPException(status_code=404, detail="Framework not found")
|
||||
|
||||
query = """
|
||||
SELECT id, framework_id, control_id, title, objective, rationale,
|
||||
scope, requirements, test_procedure, evidence,
|
||||
severity, risk_score, implementation_effort,
|
||||
evidence_confidence, open_anchors, release_state, tags,
|
||||
created_at, updated_at
|
||||
FROM canonical_controls
|
||||
WHERE framework_id = :fw_id
|
||||
"""
|
||||
params: dict[str, Any] = {"fw_id": str(fw.id)}
|
||||
|
||||
if severity:
|
||||
query += " AND severity = :sev"
|
||||
params["sev"] = severity
|
||||
if release_state:
|
||||
query += " AND release_state = :rs"
|
||||
params["rs"] = release_state
|
||||
|
||||
query += " ORDER BY control_id"
|
||||
rows = db.execute(text(query), params).fetchall()
|
||||
|
||||
return [_control_row(r) for r in rows]
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# CONTROLS
|
||||
# =============================================================================
|
||||
|
||||
@router.get("/controls")
|
||||
async def list_controls(
|
||||
severity: Optional[str] = Query(None),
|
||||
domain: Optional[str] = Query(None),
|
||||
release_state: Optional[str] = Query(None),
|
||||
):
|
||||
"""List all canonical controls, with optional filters."""
|
||||
query = """
|
||||
SELECT id, framework_id, control_id, title, objective, rationale,
|
||||
scope, requirements, test_procedure, evidence,
|
||||
severity, risk_score, implementation_effort,
|
||||
evidence_confidence, open_anchors, release_state, tags,
|
||||
created_at, updated_at
|
||||
FROM canonical_controls
|
||||
WHERE 1=1
|
||||
"""
|
||||
params: dict[str, Any] = {}
|
||||
|
||||
if severity:
|
||||
query += " AND severity = :sev"
|
||||
params["sev"] = severity
|
||||
if domain:
|
||||
query += " AND LEFT(control_id, LENGTH(:dom)) = :dom"
|
||||
params["dom"] = domain.upper()
|
||||
if release_state:
|
||||
query += " AND release_state = :rs"
|
||||
params["rs"] = release_state
|
||||
|
||||
query += " ORDER BY control_id"
|
||||
|
||||
with SessionLocal() as db:
|
||||
rows = db.execute(text(query), params).fetchall()
|
||||
|
||||
return [_control_row(r) for r in rows]
|
||||
|
||||
|
||||
@router.get("/controls/{control_id}")
|
||||
async def get_control(control_id: str):
|
||||
"""Get a single canonical control by its control_id (e.g. AUTH-001)."""
|
||||
with SessionLocal() as db:
|
||||
row = db.execute(
|
||||
text("""
|
||||
SELECT id, framework_id, control_id, title, objective, rationale,
|
||||
scope, requirements, test_procedure, evidence,
|
||||
severity, risk_score, implementation_effort,
|
||||
evidence_confidence, open_anchors, release_state, tags,
|
||||
created_at, updated_at
|
||||
FROM canonical_controls
|
||||
WHERE control_id = :cid
|
||||
"""),
|
||||
{"cid": control_id.upper()},
|
||||
).fetchone()
|
||||
|
||||
if not row:
|
||||
raise HTTPException(status_code=404, detail="Control not found")
|
||||
|
||||
return _control_row(row)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# SIMILARITY CHECK
|
||||
# =============================================================================
|
||||
|
||||
@router.post("/controls/{control_id}/similarity-check")
|
||||
async def similarity_check(control_id: str, body: SimilarityCheckRequest):
|
||||
"""Run the too-close detector against a source/candidate text pair."""
|
||||
report = await check_similarity(body.source_text, body.candidate_text)
|
||||
return {
|
||||
"control_id": control_id.upper(),
|
||||
"max_exact_run": report.max_exact_run,
|
||||
"token_overlap": report.token_overlap,
|
||||
"ngram_jaccard": report.ngram_jaccard,
|
||||
"embedding_cosine": report.embedding_cosine,
|
||||
"lcs_ratio": report.lcs_ratio,
|
||||
"status": report.status,
|
||||
"details": report.details,
|
||||
}
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# SOURCES & LICENSES
|
||||
# =============================================================================
|
||||
|
||||
@router.get("/sources")
|
||||
async def list_sources():
|
||||
"""List all registered sources with permission flags."""
|
||||
with SessionLocal() as db:
|
||||
return get_source_permissions(db)
|
||||
|
||||
|
||||
@router.get("/licenses")
|
||||
async def list_licenses():
|
||||
"""Return the license matrix."""
|
||||
with SessionLocal() as db:
|
||||
return get_license_matrix(db)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# INTERNAL HELPERS
|
||||
# =============================================================================
|
||||
|
||||
def _control_row(r) -> dict:
|
||||
return {
|
||||
"id": str(r.id),
|
||||
"framework_id": str(r.framework_id),
|
||||
"control_id": r.control_id,
|
||||
"title": r.title,
|
||||
"objective": r.objective,
|
||||
"rationale": r.rationale,
|
||||
"scope": r.scope,
|
||||
"requirements": r.requirements,
|
||||
"test_procedure": r.test_procedure,
|
||||
"evidence": r.evidence,
|
||||
"severity": r.severity,
|
||||
"risk_score": float(r.risk_score) if r.risk_score is not None else None,
|
||||
"implementation_effort": r.implementation_effort,
|
||||
"evidence_confidence": float(r.evidence_confidence) if r.evidence_confidence is not None else None,
|
||||
"open_anchors": r.open_anchors,
|
||||
"release_state": r.release_state,
|
||||
"tags": r.tags or [],
|
||||
"created_at": r.created_at.isoformat() if r.created_at else None,
|
||||
"updated_at": r.updated_at.isoformat() if r.updated_at else None,
|
||||
}
|
||||
116
backend-compliance/compliance/services/license_gate.py
Normal file
116
backend-compliance/compliance/services/license_gate.py
Normal file
@@ -0,0 +1,116 @@
|
||||
"""
|
||||
License Gate — checks whether a given source may be used for a specific purpose.
|
||||
|
||||
Usage types:
|
||||
- analysis: Read + analyse internally (TDM under UrhG 44b)
|
||||
- store_excerpt: Store verbatim excerpt in vault
|
||||
- ship_embeddings: Ship embeddings in product
|
||||
- ship_in_product: Ship text/content in product
|
||||
|
||||
Policy is driven by the canonical_control_sources table columns:
|
||||
allowed_analysis, allowed_store_excerpt, allowed_ship_embeddings, allowed_ship_in_product
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
from sqlalchemy import text
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
USAGE_COLUMN_MAP = {
|
||||
"analysis": "allowed_analysis",
|
||||
"store_excerpt": "allowed_store_excerpt",
|
||||
"ship_embeddings": "allowed_ship_embeddings",
|
||||
"ship_in_product": "allowed_ship_in_product",
|
||||
}
|
||||
|
||||
|
||||
def check_source_allowed(db: Session, source_id: str, usage_type: str) -> bool:
|
||||
"""Check whether *source_id* may be used for *usage_type*.
|
||||
|
||||
Returns False if the source is unknown or the usage is not allowed.
|
||||
"""
|
||||
col = USAGE_COLUMN_MAP.get(usage_type)
|
||||
if col is None:
|
||||
logger.warning("Unknown usage_type=%s", usage_type)
|
||||
return False
|
||||
|
||||
row = db.execute(
|
||||
text(f"SELECT {col} FROM canonical_control_sources WHERE source_id = :sid"),
|
||||
{"sid": source_id},
|
||||
).fetchone()
|
||||
|
||||
if row is None:
|
||||
logger.warning("Source %s not found in registry", source_id)
|
||||
return False
|
||||
|
||||
return bool(row[0])
|
||||
|
||||
|
||||
def get_license_matrix(db: Session) -> list[dict[str, Any]]:
|
||||
"""Return the full license matrix with allowed usages per license."""
|
||||
rows = db.execute(
|
||||
text("""
|
||||
SELECT license_id, name, terms_url, commercial_use,
|
||||
ai_training_restriction, tdm_allowed_under_44b,
|
||||
deletion_required, notes
|
||||
FROM canonical_control_licenses
|
||||
ORDER BY license_id
|
||||
""")
|
||||
).fetchall()
|
||||
|
||||
return [
|
||||
{
|
||||
"license_id": r.license_id,
|
||||
"name": r.name,
|
||||
"terms_url": r.terms_url,
|
||||
"commercial_use": r.commercial_use,
|
||||
"ai_training_restriction": r.ai_training_restriction,
|
||||
"tdm_allowed_under_44b": r.tdm_allowed_under_44b,
|
||||
"deletion_required": r.deletion_required,
|
||||
"notes": r.notes,
|
||||
}
|
||||
for r in rows
|
||||
]
|
||||
|
||||
|
||||
def get_source_permissions(db: Session) -> list[dict[str, Any]]:
|
||||
"""Return all sources with their permission flags."""
|
||||
rows = db.execute(
|
||||
text("""
|
||||
SELECT s.source_id, s.title, s.publisher, s.url, s.version_label,
|
||||
s.language, s.license_id,
|
||||
s.allowed_analysis, s.allowed_store_excerpt,
|
||||
s.allowed_ship_embeddings, s.allowed_ship_in_product,
|
||||
s.vault_retention_days, s.vault_access_tier,
|
||||
l.name AS license_name, l.commercial_use
|
||||
FROM canonical_control_sources s
|
||||
JOIN canonical_control_licenses l ON l.license_id = s.license_id
|
||||
ORDER BY s.source_id
|
||||
""")
|
||||
).fetchall()
|
||||
|
||||
return [
|
||||
{
|
||||
"source_id": r.source_id,
|
||||
"title": r.title,
|
||||
"publisher": r.publisher,
|
||||
"url": r.url,
|
||||
"version_label": r.version_label,
|
||||
"language": r.language,
|
||||
"license_id": r.license_id,
|
||||
"license_name": r.license_name,
|
||||
"commercial_use": r.commercial_use,
|
||||
"allowed_analysis": r.allowed_analysis,
|
||||
"allowed_store_excerpt": r.allowed_store_excerpt,
|
||||
"allowed_ship_embeddings": r.allowed_ship_embeddings,
|
||||
"allowed_ship_in_product": r.allowed_ship_in_product,
|
||||
"vault_retention_days": r.vault_retention_days,
|
||||
"vault_access_tier": r.vault_access_tier,
|
||||
}
|
||||
for r in rows
|
||||
]
|
||||
223
backend-compliance/compliance/services/similarity_detector.py
Normal file
223
backend-compliance/compliance/services/similarity_detector.py
Normal file
@@ -0,0 +1,223 @@
|
||||
"""
|
||||
Too-Close Similarity Detector — checks whether a candidate text is too similar
|
||||
to a protected source text (copyright / license compliance).
|
||||
|
||||
Five metrics:
|
||||
1. Exact-phrase — longest identical token sequence
|
||||
2. Token overlap — Jaccard similarity of token sets
|
||||
3. 3-gram Jaccard — Jaccard similarity of character 3-grams
|
||||
4. Embedding cosine — via bge-m3 (Ollama or embedding-service)
|
||||
5. LCS ratio — Longest Common Subsequence / max(len_a, len_b)
|
||||
|
||||
Decision:
|
||||
PASS — no fail + max 1 warn
|
||||
WARN — max 2 warn, no fail → human review
|
||||
FAIL — any fail threshold → block, rewrite required
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import re
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional
|
||||
|
||||
import httpx
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Thresholds
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
THRESHOLDS = {
|
||||
"max_exact_run": {"warn": 8, "fail": 12},
|
||||
"token_overlap": {"warn": 0.20, "fail": 0.30},
|
||||
"ngram_jaccard": {"warn": 0.10, "fail": 0.18},
|
||||
"embedding_cosine": {"warn": 0.86, "fail": 0.92},
|
||||
"lcs_ratio": {"warn": 0.35, "fail": 0.50},
|
||||
}
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tokenisation helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_WORD_RE = re.compile(r"\w+", re.UNICODE)
|
||||
|
||||
|
||||
def _tokenize(text: str) -> list[str]:
|
||||
return [t.lower() for t in _WORD_RE.findall(text)]
|
||||
|
||||
|
||||
def _char_ngrams(text: str, n: int = 3) -> set[str]:
|
||||
text = text.lower()
|
||||
return {text[i : i + n] for i in range(len(text) - n + 1)} if len(text) >= n else set()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Metric implementations
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def max_exact_run(tokens_a: list[str], tokens_b: list[str]) -> int:
|
||||
"""Longest contiguous identical token sequence between a and b."""
|
||||
if not tokens_a or not tokens_b:
|
||||
return 0
|
||||
|
||||
best = 0
|
||||
set_b = set(tokens_b)
|
||||
|
||||
for i in range(len(tokens_a)):
|
||||
if tokens_a[i] not in set_b:
|
||||
continue
|
||||
for j in range(len(tokens_b)):
|
||||
if tokens_a[i] != tokens_b[j]:
|
||||
continue
|
||||
run = 0
|
||||
ii, jj = i, j
|
||||
while ii < len(tokens_a) and jj < len(tokens_b) and tokens_a[ii] == tokens_b[jj]:
|
||||
run += 1
|
||||
ii += 1
|
||||
jj += 1
|
||||
if run > best:
|
||||
best = run
|
||||
return best
|
||||
|
||||
|
||||
def token_overlap_jaccard(tokens_a: list[str], tokens_b: list[str]) -> float:
|
||||
"""Jaccard similarity of token sets."""
|
||||
set_a, set_b = set(tokens_a), set(tokens_b)
|
||||
if not set_a and not set_b:
|
||||
return 0.0
|
||||
return len(set_a & set_b) / len(set_a | set_b)
|
||||
|
||||
|
||||
def ngram_jaccard(text_a: str, text_b: str, n: int = 3) -> float:
|
||||
"""Jaccard similarity of character n-grams."""
|
||||
grams_a = _char_ngrams(text_a, n)
|
||||
grams_b = _char_ngrams(text_b, n)
|
||||
if not grams_a and not grams_b:
|
||||
return 0.0
|
||||
return len(grams_a & grams_b) / len(grams_a | grams_b)
|
||||
|
||||
|
||||
def lcs_ratio(tokens_a: list[str], tokens_b: list[str]) -> float:
|
||||
"""LCS length / max(len_a, len_b)."""
|
||||
m, n = len(tokens_a), len(tokens_b)
|
||||
if m == 0 or n == 0:
|
||||
return 0.0
|
||||
|
||||
# Space-optimised LCS (two rows)
|
||||
prev = [0] * (n + 1)
|
||||
curr = [0] * (n + 1)
|
||||
for i in range(1, m + 1):
|
||||
for j in range(1, n + 1):
|
||||
if tokens_a[i - 1] == tokens_b[j - 1]:
|
||||
curr[j] = prev[j - 1] + 1
|
||||
else:
|
||||
curr[j] = max(prev[j], curr[j - 1])
|
||||
prev, curr = curr, [0] * (n + 1)
|
||||
|
||||
return prev[n] / max(m, n)
|
||||
|
||||
|
||||
async def embedding_cosine(text_a: str, text_b: str, embedding_url: str | None = None) -> float:
|
||||
"""Cosine similarity via embedding service (bge-m3).
|
||||
|
||||
Falls back to 0.0 if the service is unreachable.
|
||||
"""
|
||||
url = embedding_url or "http://embedding-service:8087"
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=10.0) as client:
|
||||
resp = await client.post(
|
||||
f"{url}/embed",
|
||||
json={"texts": [text_a, text_b]},
|
||||
)
|
||||
resp.raise_for_status()
|
||||
embeddings = resp.json().get("embeddings", [])
|
||||
if len(embeddings) < 2:
|
||||
return 0.0
|
||||
return _cosine(embeddings[0], embeddings[1])
|
||||
except Exception:
|
||||
logger.warning("Embedding service unreachable, skipping cosine check")
|
||||
return 0.0
|
||||
|
||||
|
||||
def _cosine(a: list[float], b: list[float]) -> float:
|
||||
dot = sum(x * y for x, y in zip(a, b))
|
||||
norm_a = sum(x * x for x in a) ** 0.5
|
||||
norm_b = sum(x * x for x in b) ** 0.5
|
||||
if norm_a == 0 or norm_b == 0:
|
||||
return 0.0
|
||||
return dot / (norm_a * norm_b)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Decision engine
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@dataclass
|
||||
class SimilarityReport:
|
||||
max_exact_run: int
|
||||
token_overlap: float
|
||||
ngram_jaccard: float
|
||||
embedding_cosine: float
|
||||
lcs_ratio: float
|
||||
status: str # PASS, WARN, FAIL
|
||||
details: dict # per-metric status
|
||||
|
||||
|
||||
def _classify(value: float | int, metric: str) -> str:
|
||||
t = THRESHOLDS[metric]
|
||||
if value >= t["fail"]:
|
||||
return "FAIL"
|
||||
if value >= t["warn"]:
|
||||
return "WARN"
|
||||
return "PASS"
|
||||
|
||||
|
||||
async def check_similarity(
|
||||
source_text: str,
|
||||
candidate_text: str,
|
||||
embedding_url: str | None = None,
|
||||
) -> SimilarityReport:
|
||||
"""Run all 5 metrics and return an aggregate report."""
|
||||
tok_src = _tokenize(source_text)
|
||||
tok_cand = _tokenize(candidate_text)
|
||||
|
||||
m_exact = max_exact_run(tok_src, tok_cand)
|
||||
m_token = token_overlap_jaccard(tok_src, tok_cand)
|
||||
m_ngram = ngram_jaccard(source_text, candidate_text)
|
||||
m_embed = await embedding_cosine(source_text, candidate_text, embedding_url)
|
||||
m_lcs = lcs_ratio(tok_src, tok_cand)
|
||||
|
||||
details = {
|
||||
"max_exact_run": _classify(m_exact, "max_exact_run"),
|
||||
"token_overlap": _classify(m_token, "token_overlap"),
|
||||
"ngram_jaccard": _classify(m_ngram, "ngram_jaccard"),
|
||||
"embedding_cosine": _classify(m_embed, "embedding_cosine"),
|
||||
"lcs_ratio": _classify(m_lcs, "lcs_ratio"),
|
||||
}
|
||||
|
||||
fail_count = sum(1 for v in details.values() if v == "FAIL")
|
||||
warn_count = sum(1 for v in details.values() if v == "WARN")
|
||||
|
||||
if fail_count > 0:
|
||||
status = "FAIL"
|
||||
elif warn_count > 2:
|
||||
status = "FAIL"
|
||||
elif warn_count > 1:
|
||||
status = "WARN"
|
||||
elif warn_count == 1:
|
||||
status = "PASS"
|
||||
else:
|
||||
status = "PASS"
|
||||
|
||||
return SimilarityReport(
|
||||
max_exact_run=m_exact,
|
||||
token_overlap=round(m_token, 4),
|
||||
ngram_jaccard=round(m_ngram, 4),
|
||||
embedding_cosine=round(m_embed, 4),
|
||||
lcs_ratio=round(m_lcs, 4),
|
||||
status=status,
|
||||
details=details,
|
||||
)
|
||||
118
backend-compliance/compliance/tests/test_similarity_detector.py
Normal file
118
backend-compliance/compliance/tests/test_similarity_detector.py
Normal file
@@ -0,0 +1,118 @@
|
||||
"""Tests for the Too-Close Similarity Detector."""
|
||||
|
||||
import pytest
|
||||
from compliance.services.similarity_detector import (
|
||||
max_exact_run,
|
||||
token_overlap_jaccard,
|
||||
ngram_jaccard,
|
||||
lcs_ratio,
|
||||
check_similarity,
|
||||
_tokenize,
|
||||
)
|
||||
|
||||
|
||||
class TestTokenize:
|
||||
def test_basic(self):
|
||||
tokens = _tokenize("Hello World 123")
|
||||
assert tokens == ["hello", "world", "123"]
|
||||
|
||||
def test_german_umlauts(self):
|
||||
tokens = _tokenize("Schutzmaßnahmen für Daten")
|
||||
assert len(tokens) == 3
|
||||
|
||||
def test_empty(self):
|
||||
assert _tokenize("") == []
|
||||
|
||||
|
||||
class TestMaxExactRun:
|
||||
def test_identical(self):
|
||||
tokens = _tokenize("the quick brown fox jumps over the lazy dog")
|
||||
assert max_exact_run(tokens, tokens) == len(tokens)
|
||||
|
||||
def test_partial_match(self):
|
||||
a = _tokenize("the quick brown fox")
|
||||
b = _tokenize("a quick brown cat")
|
||||
assert max_exact_run(a, b) == 2 # "quick brown"
|
||||
|
||||
def test_no_match(self):
|
||||
a = _tokenize("hello world")
|
||||
b = _tokenize("foo bar")
|
||||
assert max_exact_run(a, b) == 0
|
||||
|
||||
def test_empty(self):
|
||||
assert max_exact_run([], []) == 0
|
||||
assert max_exact_run(["a"], []) == 0
|
||||
|
||||
|
||||
class TestTokenOverlapJaccard:
|
||||
def test_identical(self):
|
||||
tokens = _tokenize("hello world")
|
||||
assert token_overlap_jaccard(tokens, tokens) == 1.0
|
||||
|
||||
def test_no_overlap(self):
|
||||
a = _tokenize("hello world")
|
||||
b = _tokenize("foo bar")
|
||||
assert token_overlap_jaccard(a, b) == 0.0
|
||||
|
||||
def test_partial(self):
|
||||
a = _tokenize("hello world foo")
|
||||
b = _tokenize("hello bar baz")
|
||||
# intersection: {hello}, union: {hello, world, foo, bar, baz}
|
||||
assert abs(token_overlap_jaccard(a, b) - 0.2) < 0.01
|
||||
|
||||
|
||||
class TestNgramJaccard:
|
||||
def test_identical(self):
|
||||
assert ngram_jaccard("hello", "hello") == 1.0
|
||||
|
||||
def test_different(self):
|
||||
assert ngram_jaccard("abc", "xyz") == 0.0
|
||||
|
||||
def test_short(self):
|
||||
assert ngram_jaccard("ab", "cd") == 0.0 # too short for 3-grams
|
||||
|
||||
|
||||
class TestLcsRatio:
|
||||
def test_identical(self):
|
||||
tokens = _tokenize("multi factor authentication required")
|
||||
assert lcs_ratio(tokens, tokens) == 1.0
|
||||
|
||||
def test_partial(self):
|
||||
a = _tokenize("multi factor authentication")
|
||||
b = _tokenize("single factor verification")
|
||||
# LCS: "factor" (length 1), max(3,3) = 3, ratio = 1/3
|
||||
result = lcs_ratio(a, b)
|
||||
assert 0.3 < result < 0.4
|
||||
|
||||
def test_empty(self):
|
||||
assert lcs_ratio([], []) == 0.0
|
||||
|
||||
|
||||
class TestCheckSimilarity:
|
||||
@pytest.mark.asyncio
|
||||
async def test_identical_texts_fail(self):
|
||||
text = "Multi-factor authentication must be enforced for all administrative accounts."
|
||||
report = await check_similarity(text, text, embedding_url="http://localhost:99999")
|
||||
# Identical texts should have max overlap
|
||||
assert report.token_overlap == 1.0
|
||||
assert report.status == "FAIL"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_different_texts_pass(self):
|
||||
source = "Die Anwendung muss eine Zwei-Faktor-Authentisierung implementieren."
|
||||
candidate = "Network traffic should be encrypted using TLS 1.3 at minimum."
|
||||
report = await check_similarity(source, candidate, embedding_url="http://localhost:99999")
|
||||
assert report.token_overlap < 0.1
|
||||
assert report.status == "PASS"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_report_fields(self):
|
||||
report = await check_similarity("hello world", "foo bar", embedding_url="http://localhost:99999")
|
||||
assert hasattr(report, "max_exact_run")
|
||||
assert hasattr(report, "token_overlap")
|
||||
assert hasattr(report, "ngram_jaccard")
|
||||
assert hasattr(report, "embedding_cosine")
|
||||
assert hasattr(report, "lcs_ratio")
|
||||
assert hasattr(report, "status")
|
||||
assert hasattr(report, "details")
|
||||
assert report.status in ("PASS", "WARN", "FAIL")
|
||||
204
backend-compliance/migrations/044_canonical_control_library.sql
Normal file
204
backend-compliance/migrations/044_canonical_control_library.sql
Normal file
@@ -0,0 +1,204 @@
|
||||
-- Migration 044: Canonical Control Library
|
||||
-- Provides a legally defensible, independently authored security control library.
|
||||
-- Controls are formulated independently (no BSI/proprietary nomenclature).
|
||||
-- Every control MUST have open-source anchors (OWASP, NIST, ENISA).
|
||||
-- Source provenance is tracked internally for audit, never shipped in product.
|
||||
--
|
||||
-- Tables:
|
||||
-- 1. canonical_control_licenses — License metadata for source materials
|
||||
-- 2. canonical_control_sources — Source registry (internal, not product-facing)
|
||||
-- 3. canonical_control_frameworks — Registered control frameworks
|
||||
-- 4. canonical_controls — The actual controls (product-facing)
|
||||
-- 5. canonical_control_mappings — Provenance trail (internal audit)
|
||||
|
||||
BEGIN;
|
||||
|
||||
-- =============================================================================
|
||||
-- 1. License Metadata
|
||||
-- =============================================================================
|
||||
|
||||
CREATE TABLE IF NOT EXISTS canonical_control_licenses (
|
||||
license_id VARCHAR(50) PRIMARY KEY,
|
||||
name VARCHAR(255) NOT NULL,
|
||||
terms_url TEXT,
|
||||
commercial_use VARCHAR(20) NOT NULL
|
||||
CHECK (commercial_use IN ('allowed', 'restricted', 'prohibited', 'unclear')),
|
||||
ai_training_restriction VARCHAR(20),
|
||||
tdm_allowed_under_44b VARCHAR(10)
|
||||
CHECK (tdm_allowed_under_44b IN ('yes', 'no', 'unclear')),
|
||||
deletion_required BOOLEAN DEFAULT false,
|
||||
notes TEXT,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
||||
);
|
||||
|
||||
-- =============================================================================
|
||||
-- 2. Source Registry (internal — never shipped in product)
|
||||
-- =============================================================================
|
||||
|
||||
CREATE TABLE IF NOT EXISTS canonical_control_sources (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
source_id VARCHAR(50) UNIQUE NOT NULL,
|
||||
title VARCHAR(500) NOT NULL,
|
||||
publisher VARCHAR(100) NOT NULL,
|
||||
url TEXT,
|
||||
version_label VARCHAR(50),
|
||||
language VARCHAR(5) DEFAULT 'de',
|
||||
license_id VARCHAR(50) NOT NULL
|
||||
REFERENCES canonical_control_licenses(license_id),
|
||||
allowed_analysis BOOLEAN DEFAULT false,
|
||||
allowed_store_excerpt BOOLEAN DEFAULT false,
|
||||
allowed_ship_embeddings BOOLEAN DEFAULT false,
|
||||
allowed_ship_in_product BOOLEAN DEFAULT false,
|
||||
vault_retention_days INTEGER DEFAULT 30,
|
||||
vault_access_tier VARCHAR(20) DEFAULT 'restricted'
|
||||
CHECK (vault_access_tier IN ('restricted', 'internal', 'public')),
|
||||
retrieved_at TIMESTAMPTZ,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_ccs_license ON canonical_control_sources(license_id);
|
||||
|
||||
-- =============================================================================
|
||||
-- 3. Control Frameworks
|
||||
-- =============================================================================
|
||||
|
||||
CREATE TABLE IF NOT EXISTS canonical_control_frameworks (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
framework_id VARCHAR(50) UNIQUE NOT NULL,
|
||||
name VARCHAR(255) NOT NULL,
|
||||
version VARCHAR(20) NOT NULL,
|
||||
description TEXT,
|
||||
owner VARCHAR(100) DEFAULT 'security-platform',
|
||||
policy_version VARCHAR(20),
|
||||
release_state VARCHAR(20) DEFAULT 'draft'
|
||||
CHECK (release_state IN ('draft', 'review', 'approved', 'deprecated')),
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
updated_at TIMESTAMPTZ DEFAULT NOW()
|
||||
);
|
||||
|
||||
-- =============================================================================
|
||||
-- 4. Canonical Controls (product-facing)
|
||||
-- =============================================================================
|
||||
|
||||
CREATE TABLE IF NOT EXISTS canonical_controls (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
framework_id UUID NOT NULL
|
||||
REFERENCES canonical_control_frameworks(id),
|
||||
control_id VARCHAR(20) NOT NULL,
|
||||
title VARCHAR(255) NOT NULL,
|
||||
objective TEXT NOT NULL,
|
||||
rationale TEXT NOT NULL,
|
||||
scope JSONB NOT NULL DEFAULT '{}',
|
||||
requirements JSONB NOT NULL DEFAULT '[]',
|
||||
test_procedure JSONB NOT NULL DEFAULT '[]',
|
||||
evidence JSONB NOT NULL DEFAULT '[]',
|
||||
severity VARCHAR(20) NOT NULL
|
||||
CHECK (severity IN ('low', 'medium', 'high', 'critical')),
|
||||
risk_score NUMERIC(3,1) CHECK (risk_score >= 0 AND risk_score <= 10),
|
||||
implementation_effort VARCHAR(2)
|
||||
CHECK (implementation_effort IN ('s', 'm', 'l', 'xl')),
|
||||
evidence_confidence NUMERIC(3,2) CHECK (evidence_confidence >= 0 AND evidence_confidence <= 1),
|
||||
open_anchors JSONB NOT NULL DEFAULT '[]',
|
||||
release_state VARCHAR(20) DEFAULT 'draft'
|
||||
CHECK (release_state IN ('draft', 'review', 'approved', 'deprecated')),
|
||||
tags JSONB DEFAULT '[]',
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
updated_at TIMESTAMPTZ DEFAULT NOW(),
|
||||
UNIQUE (framework_id, control_id)
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_canonical_controls_domain
|
||||
ON canonical_controls ((LEFT(control_id, 4)));
|
||||
CREATE INDEX IF NOT EXISTS idx_canonical_controls_severity
|
||||
ON canonical_controls (severity);
|
||||
CREATE INDEX IF NOT EXISTS idx_canonical_controls_release
|
||||
ON canonical_controls (release_state);
|
||||
CREATE INDEX IF NOT EXISTS idx_canonical_controls_framework
|
||||
ON canonical_controls (framework_id);
|
||||
|
||||
-- =============================================================================
|
||||
-- 5. Control Mappings / Provenance (internal audit trail)
|
||||
-- =============================================================================
|
||||
|
||||
CREATE TABLE IF NOT EXISTS canonical_control_mappings (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
control_id UUID NOT NULL
|
||||
REFERENCES canonical_controls(id) ON DELETE CASCADE,
|
||||
source_id UUID NOT NULL
|
||||
REFERENCES canonical_control_sources(id),
|
||||
mapping_type VARCHAR(30) NOT NULL
|
||||
CHECK (mapping_type IN ('inspired_by_internal', 'corroborated_by_open', 'derived_only_open')),
|
||||
attribution_class VARCHAR(20) NOT NULL
|
||||
CHECK (attribution_class IN ('internal_only', 'product_ok')),
|
||||
source_locator VARCHAR(100),
|
||||
paraphrase_note TEXT,
|
||||
excerpt_hashes JSONB DEFAULT '[]',
|
||||
similarity_report JSONB,
|
||||
reviewed_by VARCHAR(100),
|
||||
reviewed_at TIMESTAMPTZ,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_ccm_control ON canonical_control_mappings(control_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_ccm_source ON canonical_control_mappings(source_id);
|
||||
|
||||
-- =============================================================================
|
||||
-- SEED: Licenses
|
||||
-- =============================================================================
|
||||
|
||||
INSERT INTO canonical_control_licenses (license_id, name, terms_url, commercial_use, ai_training_restriction, tdm_allowed_under_44b, deletion_required, notes)
|
||||
VALUES
|
||||
('BSI_TOS_2025', 'BSI Nutzungsbedingungen', 'https://www.bsi.bund.de/impressum', 'restricted', 'unclear', 'yes', true,
|
||||
'Kommerziell nur mit Zustimmung. TDM unter UrhG 44b erlaubt, Kopien danach loeschen.'),
|
||||
('OWASP_CC_BY_SA', 'Creative Commons BY-SA 4.0', 'https://creativecommons.org/licenses/by-sa/4.0/', 'allowed', null, 'yes', false,
|
||||
'Offen, Attribution + ShareAlike. Kommerziell erlaubt.'),
|
||||
('NIST_PUBLIC_DOMAIN', 'US Government Public Domain', 'https://www.nist.gov/open/copyright-fair-use-and-licensing-statements-srd-data-software-and-technical-series-publications', 'allowed', null, 'yes', false,
|
||||
'US-Regierungswerke sind gemeinfrei. Keine Einschraenkungen.'),
|
||||
('ENISA_CC_BY', 'Creative Commons BY 4.0', 'https://creativecommons.org/licenses/by/4.0/', 'allowed', null, 'yes', false,
|
||||
'Offen, nur Attribution. Kommerziell erlaubt.'),
|
||||
('ETSI_RESTRICTIVE', 'ETSI Terms of Use', 'https://www.etsi.org/intellectual-property-rights', 'prohibited', 'prohibited', 'no', true,
|
||||
'Kommerzielle Nutzung und AI-Training ausdruecklich verboten.'),
|
||||
('ISO_PAYWALLED', 'ISO Copyright', 'https://www.iso.org/privacy-and-copyright.html', 'prohibited', 'prohibited', 'unclear', true,
|
||||
'Kostenpflichtig. Kein Recht auf Reproduktion, Paraphrase muss hinreichend abstrahiert sein.'),
|
||||
('IEC_AI_PROHIBITED', 'IEC Terms of Use', 'https://www.iec.ch/terms-conditions', 'prohibited', 'prohibited', 'no', true,
|
||||
'AI-Training explizit verboten.'),
|
||||
('CSA_NC', 'CSA Noncommercial', 'https://cloudsecurityalliance.org/license/', 'restricted', null, 'unclear', false,
|
||||
'Noncommercial license. Kommerziell nur mit separater Vereinbarung.'),
|
||||
('CIS_CC_BY_NC_ND', 'Creative Commons BY-NC-ND 4.0', 'https://creativecommons.org/licenses/by-nc-nd/4.0/', 'prohibited', null, 'yes', false,
|
||||
'Kein kommerzieller Gebrauch, keine Ableitungen.')
|
||||
ON CONFLICT (license_id) DO NOTHING;
|
||||
|
||||
-- =============================================================================
|
||||
-- SEED: Sources
|
||||
-- =============================================================================
|
||||
|
||||
INSERT INTO canonical_control_sources (source_id, title, publisher, url, version_label, language, license_id, allowed_analysis, allowed_store_excerpt, allowed_ship_embeddings, allowed_ship_in_product)
|
||||
VALUES
|
||||
('BSI_TR03161_1', 'BSI TR-03161 Teil 1 — Mobile Anwendungen', 'BSI', 'https://www.bsi.bund.de/SharedDocs/Downloads/DE/BSI/Publikationen/TechnischeRichtlinien/TR03161/BSI-TR-03161-1.html', '1.0', 'de', 'BSI_TOS_2025', true, false, false, false),
|
||||
('BSI_TR03161_2', 'BSI TR-03161 Teil 2 — Web-Anwendungen', 'BSI', 'https://www.bsi.bund.de/SharedDocs/Downloads/DE/BSI/Publikationen/TechnischeRichtlinien/TR03161/BSI-TR-03161-2.html', '1.0', 'de', 'BSI_TOS_2025', true, false, false, false),
|
||||
('BSI_TR03161_3', 'BSI TR-03161 Teil 3 — Hintergrunddienste', 'BSI', 'https://www.bsi.bund.de/SharedDocs/Downloads/DE/BSI/Publikationen/TechnischeRichtlinien/TR03161/BSI-TR-03161-3.html', '1.0', 'de', 'BSI_TOS_2025', true, false, false, false),
|
||||
('OWASP_ASVS', 'OWASP Application Security Verification Standard', 'OWASP Foundation', 'https://owasp.org/www-project-application-security-verification-standard/', '4.0.3', 'en', 'OWASP_CC_BY_SA', true, true, true, true),
|
||||
('OWASP_MASVS', 'OWASP Mobile Application Security Verification Standard', 'OWASP Foundation', 'https://mas.owasp.org/', '2.1.0', 'en', 'OWASP_CC_BY_SA', true, true, true, true),
|
||||
('OWASP_TOP10', 'OWASP Top 10', 'OWASP Foundation', 'https://owasp.org/www-project-top-ten/', '2021', 'en', 'OWASP_CC_BY_SA', true, true, true, true),
|
||||
('NIST_SP800_53', 'NIST SP 800-53 Rev. 5 — Security and Privacy Controls', 'NIST', 'https://csrc.nist.gov/publications/detail/sp/800-53/rev-5/final', 'Rev.5', 'en', 'NIST_PUBLIC_DOMAIN', true, true, true, true),
|
||||
('NIST_SP800_63B', 'NIST SP 800-63B — Digital Identity Guidelines (Authentication)', 'NIST', 'https://pages.nist.gov/800-63-3/sp800-63b.html', 'Rev.3', 'en', 'NIST_PUBLIC_DOMAIN', true, true, true, true),
|
||||
('ENISA_GOOD_PRACTICES', 'ENISA Good Practices for Security of IoT/Mobile', 'ENISA', 'https://www.enisa.europa.eu/publications', null, 'en', 'ENISA_CC_BY', true, true, true, true),
|
||||
('CIS_CONTROLS', 'CIS Critical Security Controls', 'Center for Internet Security', 'https://www.cisecurity.org/controls', 'v8', 'en', 'CIS_CC_BY_NC_ND', true, false, false, false)
|
||||
ON CONFLICT (source_id) DO NOTHING;
|
||||
|
||||
-- =============================================================================
|
||||
-- SEED: Default Framework
|
||||
-- =============================================================================
|
||||
|
||||
INSERT INTO canonical_control_frameworks (framework_id, name, version, description, owner, release_state)
|
||||
VALUES (
|
||||
'bp_security_v1',
|
||||
'BreakPilot Security Controls',
|
||||
'1.0',
|
||||
'Eigenstaendig formulierte Security Controls basierend auf offenem Wissen (OWASP, NIST, ENISA). Unabhaengige Taxonomie und Nomenklatur.',
|
||||
'security-platform',
|
||||
'draft'
|
||||
)
|
||||
ON CONFLICT (framework_id) DO NOTHING;
|
||||
|
||||
COMMIT;
|
||||
225
backend-compliance/tests/test_canonical_control_routes.py
Normal file
225
backend-compliance/tests/test_canonical_control_routes.py
Normal file
@@ -0,0 +1,225 @@
|
||||
"""Tests for Canonical Control Library routes (canonical_control_routes.py)."""
|
||||
|
||||
import pytest
|
||||
from unittest.mock import MagicMock, patch
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from compliance.api.canonical_control_routes import (
|
||||
FrameworkResponse,
|
||||
ControlResponse,
|
||||
SimilarityCheckRequest,
|
||||
SimilarityCheckResponse,
|
||||
_control_row,
|
||||
)
|
||||
|
||||
|
||||
class TestFrameworkResponse:
|
||||
"""Tests for FrameworkResponse model."""
|
||||
|
||||
def test_basic_creation(self):
|
||||
resp = FrameworkResponse(
|
||||
id="uuid-1",
|
||||
framework_id="bp_security_v1",
|
||||
name="BreakPilot Security Controls",
|
||||
version="1.0",
|
||||
release_state="draft",
|
||||
created_at="2026-03-12T00:00:00+00:00",
|
||||
updated_at="2026-03-12T00:00:00+00:00",
|
||||
)
|
||||
assert resp.framework_id == "bp_security_v1"
|
||||
assert resp.version == "1.0"
|
||||
|
||||
def test_optional_fields(self):
|
||||
resp = FrameworkResponse(
|
||||
id="uuid-1",
|
||||
framework_id="test",
|
||||
name="Test",
|
||||
version="1.0",
|
||||
release_state="draft",
|
||||
created_at="2026-03-12T00:00:00+00:00",
|
||||
updated_at="2026-03-12T00:00:00+00:00",
|
||||
)
|
||||
assert resp.description is None
|
||||
assert resp.owner is None
|
||||
assert resp.policy_version is None
|
||||
|
||||
|
||||
class TestControlResponse:
|
||||
"""Tests for ControlResponse model."""
|
||||
|
||||
def test_full_control(self):
|
||||
resp = ControlResponse(
|
||||
id="uuid-1",
|
||||
framework_id="uuid-fw",
|
||||
control_id="AUTH-001",
|
||||
title="Multi-Factor Authentication",
|
||||
objective="Require MFA for privileged access.",
|
||||
rationale="Passwords alone are insufficient.",
|
||||
scope={"platforms": ["web"]},
|
||||
requirements=["MFA for admin accounts"],
|
||||
test_procedure=["Test admin login without MFA"],
|
||||
evidence=[{"type": "config", "description": "MFA config"}],
|
||||
severity="high",
|
||||
open_anchors=[{"framework": "OWASP ASVS", "ref": "V2.8", "url": "https://owasp.org"}],
|
||||
release_state="draft",
|
||||
tags=["mfa", "auth"],
|
||||
created_at="2026-03-12T00:00:00+00:00",
|
||||
updated_at="2026-03-12T00:00:00+00:00",
|
||||
)
|
||||
assert resp.control_id == "AUTH-001"
|
||||
assert resp.severity == "high"
|
||||
assert len(resp.open_anchors) == 1
|
||||
|
||||
def test_optional_numeric_fields(self):
|
||||
resp = ControlResponse(
|
||||
id="uuid-1",
|
||||
framework_id="uuid-fw",
|
||||
control_id="NET-001",
|
||||
title="TLS",
|
||||
objective="Encrypt traffic.",
|
||||
rationale="Prevent eavesdropping.",
|
||||
scope={},
|
||||
requirements=[],
|
||||
test_procedure=[],
|
||||
evidence=[],
|
||||
severity="high",
|
||||
open_anchors=[],
|
||||
release_state="draft",
|
||||
tags=[],
|
||||
created_at="2026-03-12T00:00:00+00:00",
|
||||
updated_at="2026-03-12T00:00:00+00:00",
|
||||
)
|
||||
assert resp.risk_score is None
|
||||
assert resp.implementation_effort is None
|
||||
assert resp.evidence_confidence is None
|
||||
|
||||
|
||||
class TestSimilarityCheckRequest:
|
||||
"""Tests for SimilarityCheckRequest model."""
|
||||
|
||||
def test_valid_request(self):
|
||||
req = SimilarityCheckRequest(
|
||||
source_text="Die Anwendung muss MFA implementieren.",
|
||||
candidate_text="Multi-factor authentication is required.",
|
||||
)
|
||||
assert req.source_text == "Die Anwendung muss MFA implementieren."
|
||||
assert req.candidate_text == "Multi-factor authentication is required."
|
||||
|
||||
def test_empty_strings(self):
|
||||
req = SimilarityCheckRequest(source_text="", candidate_text="")
|
||||
assert req.source_text == ""
|
||||
|
||||
|
||||
class TestSimilarityCheckResponse:
|
||||
"""Tests for SimilarityCheckResponse model."""
|
||||
|
||||
def test_pass_status(self):
|
||||
resp = SimilarityCheckResponse(
|
||||
max_exact_run=2,
|
||||
token_overlap=0.05,
|
||||
ngram_jaccard=0.03,
|
||||
embedding_cosine=0.45,
|
||||
lcs_ratio=0.12,
|
||||
status="PASS",
|
||||
details={
|
||||
"max_exact_run": "PASS",
|
||||
"token_overlap": "PASS",
|
||||
"ngram_jaccard": "PASS",
|
||||
"embedding_cosine": "PASS",
|
||||
"lcs_ratio": "PASS",
|
||||
},
|
||||
)
|
||||
assert resp.status == "PASS"
|
||||
|
||||
def test_fail_status(self):
|
||||
resp = SimilarityCheckResponse(
|
||||
max_exact_run=15,
|
||||
token_overlap=0.35,
|
||||
ngram_jaccard=0.20,
|
||||
embedding_cosine=0.95,
|
||||
lcs_ratio=0.55,
|
||||
status="FAIL",
|
||||
details={
|
||||
"max_exact_run": "FAIL",
|
||||
"token_overlap": "FAIL",
|
||||
"ngram_jaccard": "FAIL",
|
||||
"embedding_cosine": "FAIL",
|
||||
"lcs_ratio": "FAIL",
|
||||
},
|
||||
)
|
||||
assert resp.status == "FAIL"
|
||||
|
||||
|
||||
class TestControlRowConversion:
|
||||
"""Tests for _control_row helper."""
|
||||
|
||||
def _make_row(self, **overrides):
|
||||
now = datetime.now(timezone.utc)
|
||||
defaults = {
|
||||
"id": "uuid-ctrl-1",
|
||||
"framework_id": "uuid-fw-1",
|
||||
"control_id": "AUTH-001",
|
||||
"title": "Multi-Factor Authentication",
|
||||
"objective": "Require MFA.",
|
||||
"rationale": "Passwords insufficient.",
|
||||
"scope": {"platforms": ["web", "mobile"]},
|
||||
"requirements": ["Req 1", "Req 2"],
|
||||
"test_procedure": ["Test 1"],
|
||||
"evidence": [{"type": "config", "description": "MFA config"}],
|
||||
"severity": "high",
|
||||
"risk_score": 8.5,
|
||||
"implementation_effort": "m",
|
||||
"evidence_confidence": 0.85,
|
||||
"open_anchors": [
|
||||
{"framework": "OWASP ASVS", "ref": "V2.8", "url": "https://owasp.org"},
|
||||
],
|
||||
"release_state": "draft",
|
||||
"tags": ["mfa"],
|
||||
"created_at": now,
|
||||
"updated_at": now,
|
||||
}
|
||||
defaults.update(overrides)
|
||||
mock = MagicMock()
|
||||
for key, value in defaults.items():
|
||||
setattr(mock, key, value)
|
||||
return mock
|
||||
|
||||
def test_basic_conversion(self):
|
||||
row = self._make_row()
|
||||
result = _control_row(row)
|
||||
assert result["control_id"] == "AUTH-001"
|
||||
assert result["severity"] == "high"
|
||||
assert result["risk_score"] == 8.5
|
||||
assert result["implementation_effort"] == "m"
|
||||
assert result["evidence_confidence"] == 0.85
|
||||
assert len(result["open_anchors"]) == 1
|
||||
|
||||
def test_null_numeric_fields(self):
|
||||
row = self._make_row(risk_score=None, evidence_confidence=None, implementation_effort=None)
|
||||
result = _control_row(row)
|
||||
assert result["risk_score"] is None
|
||||
assert result["evidence_confidence"] is None
|
||||
assert result["implementation_effort"] is None
|
||||
|
||||
def test_empty_tags(self):
|
||||
row = self._make_row(tags=None)
|
||||
result = _control_row(row)
|
||||
assert result["tags"] == []
|
||||
|
||||
def test_empty_tags_list(self):
|
||||
row = self._make_row(tags=[])
|
||||
result = _control_row(row)
|
||||
assert result["tags"] == []
|
||||
|
||||
def test_timestamp_format(self):
|
||||
now = datetime(2026, 3, 12, 10, 30, 0, tzinfo=timezone.utc)
|
||||
row = self._make_row(created_at=now, updated_at=now)
|
||||
result = _control_row(row)
|
||||
assert "2026-03-12" in result["created_at"]
|
||||
assert "10:30" in result["created_at"]
|
||||
|
||||
def test_none_timestamps(self):
|
||||
row = self._make_row(created_at=None, updated_at=None)
|
||||
result = _control_row(row)
|
||||
assert result["created_at"] is None
|
||||
assert result["updated_at"] is None
|
||||
161
backend-compliance/tests/test_license_gate.py
Normal file
161
backend-compliance/tests/test_license_gate.py
Normal file
@@ -0,0 +1,161 @@
|
||||
"""Tests for License Gate service (license_gate.py)."""
|
||||
|
||||
import pytest
|
||||
from unittest.mock import MagicMock, patch
|
||||
from collections import namedtuple
|
||||
|
||||
from compliance.services.license_gate import (
|
||||
check_source_allowed,
|
||||
get_license_matrix,
|
||||
get_source_permissions,
|
||||
USAGE_COLUMN_MAP,
|
||||
)
|
||||
|
||||
|
||||
class TestUsageColumnMap:
|
||||
"""Test the usage type to column mapping."""
|
||||
|
||||
def test_all_usage_types_mapped(self):
|
||||
expected = {"analysis", "store_excerpt", "ship_embeddings", "ship_in_product"}
|
||||
assert set(USAGE_COLUMN_MAP.keys()) == expected
|
||||
|
||||
def test_column_names(self):
|
||||
assert USAGE_COLUMN_MAP["analysis"] == "allowed_analysis"
|
||||
assert USAGE_COLUMN_MAP["store_excerpt"] == "allowed_store_excerpt"
|
||||
assert USAGE_COLUMN_MAP["ship_embeddings"] == "allowed_ship_embeddings"
|
||||
assert USAGE_COLUMN_MAP["ship_in_product"] == "allowed_ship_in_product"
|
||||
|
||||
|
||||
class TestCheckSourceAllowed:
|
||||
"""Tests for check_source_allowed()."""
|
||||
|
||||
def _mock_db(self, return_value):
|
||||
db = MagicMock()
|
||||
mock_result = MagicMock()
|
||||
if return_value is None:
|
||||
mock_result.fetchone.return_value = None
|
||||
else:
|
||||
mock_result.fetchone.return_value = (return_value,)
|
||||
db.execute.return_value = mock_result
|
||||
return db
|
||||
|
||||
def test_allowed_analysis(self):
|
||||
db = self._mock_db(True)
|
||||
assert check_source_allowed(db, "OWASP_ASVS", "analysis") is True
|
||||
|
||||
def test_denied_ship_in_product(self):
|
||||
db = self._mock_db(False)
|
||||
assert check_source_allowed(db, "BSI_TR03161_1", "ship_in_product") is False
|
||||
|
||||
def test_unknown_source(self):
|
||||
db = self._mock_db(None)
|
||||
assert check_source_allowed(db, "NONEXISTENT", "analysis") is False
|
||||
|
||||
def test_unknown_usage_type(self):
|
||||
db = MagicMock()
|
||||
assert check_source_allowed(db, "OWASP_ASVS", "invalid_type") is False
|
||||
# DB should not be called for invalid usage type
|
||||
db.execute.assert_not_called()
|
||||
|
||||
def test_allowed_store_excerpt(self):
|
||||
db = self._mock_db(True)
|
||||
assert check_source_allowed(db, "OWASP_ASVS", "store_excerpt") is True
|
||||
|
||||
def test_denied_store_excerpt(self):
|
||||
db = self._mock_db(False)
|
||||
assert check_source_allowed(db, "BSI_TR03161_1", "store_excerpt") is False
|
||||
|
||||
|
||||
class TestGetLicenseMatrix:
|
||||
"""Tests for get_license_matrix()."""
|
||||
|
||||
def test_returns_list(self):
|
||||
LicRow = namedtuple("LicRow", [
|
||||
"license_id", "name", "terms_url", "commercial_use",
|
||||
"ai_training_restriction", "tdm_allowed_under_44b",
|
||||
"deletion_required", "notes",
|
||||
])
|
||||
rows = [
|
||||
LicRow("OWASP_CC_BY_SA", "CC BY-SA 4.0", "https://example.com",
|
||||
"allowed", None, "yes", False, "Open source"),
|
||||
LicRow("BSI_TOS_2025", "BSI ToS", "https://bsi.bund.de",
|
||||
"restricted", "unclear", "yes", True, "Commercial restricted"),
|
||||
]
|
||||
|
||||
db = MagicMock()
|
||||
db.execute.return_value.fetchall.return_value = rows
|
||||
result = get_license_matrix(db)
|
||||
|
||||
assert len(result) == 2
|
||||
assert result[0]["license_id"] == "OWASP_CC_BY_SA"
|
||||
assert result[0]["commercial_use"] == "allowed"
|
||||
assert result[0]["deletion_required"] is False
|
||||
assert result[1]["license_id"] == "BSI_TOS_2025"
|
||||
assert result[1]["commercial_use"] == "restricted"
|
||||
assert result[1]["deletion_required"] is True
|
||||
|
||||
def test_empty_result(self):
|
||||
db = MagicMock()
|
||||
db.execute.return_value.fetchall.return_value = []
|
||||
result = get_license_matrix(db)
|
||||
assert result == []
|
||||
|
||||
|
||||
class TestGetSourcePermissions:
|
||||
"""Tests for get_source_permissions()."""
|
||||
|
||||
def test_returns_list_with_join(self):
|
||||
SrcRow = namedtuple("SrcRow", [
|
||||
"source_id", "title", "publisher", "url", "version_label",
|
||||
"language", "license_id", "allowed_analysis", "allowed_store_excerpt",
|
||||
"allowed_ship_embeddings", "allowed_ship_in_product",
|
||||
"vault_retention_days", "vault_access_tier",
|
||||
"license_name", "commercial_use",
|
||||
])
|
||||
rows = [
|
||||
SrcRow(
|
||||
"OWASP_ASVS", "OWASP ASVS", "OWASP Foundation",
|
||||
"https://owasp.org", "4.0.3", "en", "OWASP_CC_BY_SA",
|
||||
True, True, True, True, 30, "public",
|
||||
"CC BY-SA 4.0", "allowed",
|
||||
),
|
||||
]
|
||||
|
||||
db = MagicMock()
|
||||
db.execute.return_value.fetchall.return_value = rows
|
||||
result = get_source_permissions(db)
|
||||
|
||||
assert len(result) == 1
|
||||
src = result[0]
|
||||
assert src["source_id"] == "OWASP_ASVS"
|
||||
assert src["allowed_analysis"] is True
|
||||
assert src["allowed_ship_in_product"] is True
|
||||
assert src["license_name"] == "CC BY-SA 4.0"
|
||||
assert src["commercial_use"] == "allowed"
|
||||
|
||||
def test_restricted_source(self):
|
||||
SrcRow = namedtuple("SrcRow", [
|
||||
"source_id", "title", "publisher", "url", "version_label",
|
||||
"language", "license_id", "allowed_analysis", "allowed_store_excerpt",
|
||||
"allowed_ship_embeddings", "allowed_ship_in_product",
|
||||
"vault_retention_days", "vault_access_tier",
|
||||
"license_name", "commercial_use",
|
||||
])
|
||||
rows = [
|
||||
SrcRow(
|
||||
"BSI_TR03161_1", "BSI TR-03161 Teil 1", "BSI",
|
||||
"https://bsi.bund.de", "1.0", "de", "BSI_TOS_2025",
|
||||
True, False, False, False, 30, "restricted",
|
||||
"BSI Nutzungsbedingungen", "restricted",
|
||||
),
|
||||
]
|
||||
|
||||
db = MagicMock()
|
||||
db.execute.return_value.fetchall.return_value = rows
|
||||
result = get_source_permissions(db)
|
||||
|
||||
src = result[0]
|
||||
assert src["allowed_analysis"] is True
|
||||
assert src["allowed_store_excerpt"] is False
|
||||
assert src["allowed_ship_embeddings"] is False
|
||||
assert src["allowed_ship_in_product"] is False
|
||||
142
backend-compliance/tests/test_validate_controls.py
Normal file
142
backend-compliance/tests/test_validate_controls.py
Normal file
@@ -0,0 +1,142 @@
|
||||
"""Tests for the CI/CD control validator script."""
|
||||
|
||||
import json
|
||||
import subprocess
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
REPO_ROOT = Path(__file__).resolve().parent.parent.parent
|
||||
VALIDATOR = REPO_ROOT / "scripts" / "validate-controls.py"
|
||||
CONTROLS_FILE = REPO_ROOT / "ai-compliance-sdk" / "policies" / "canonical_controls_v1.json"
|
||||
|
||||
|
||||
class TestValidatorScript:
|
||||
"""Integration tests for validate-controls.py."""
|
||||
|
||||
def test_validator_passes_on_valid_controls(self):
|
||||
result = subprocess.run(
|
||||
[sys.executable, str(VALIDATOR)],
|
||||
capture_output=True, text=True, cwd=str(REPO_ROOT),
|
||||
)
|
||||
assert result.returncode == 0, f"Validator failed:\n{result.stdout}\n{result.stderr}"
|
||||
assert "ALL CHECKS PASSED" in result.stdout
|
||||
|
||||
def test_validator_reports_control_count(self):
|
||||
result = subprocess.run(
|
||||
[sys.executable, str(VALIDATOR)],
|
||||
capture_output=True, text=True, cwd=str(REPO_ROOT),
|
||||
)
|
||||
assert "Controls: 10" in result.stdout
|
||||
assert "Open Anchors:" in result.stdout
|
||||
|
||||
|
||||
class TestControlsJsonStructure:
|
||||
"""Direct validation of the JSON file structure."""
|
||||
|
||||
@pytest.fixture
|
||||
def controls_data(self):
|
||||
with open(CONTROLS_FILE) as f:
|
||||
return json.load(f)
|
||||
|
||||
def test_top_level_keys(self, controls_data):
|
||||
assert "version" in controls_data
|
||||
assert "schema" in controls_data
|
||||
assert "framework" in controls_data
|
||||
assert "domains" in controls_data
|
||||
assert "controls" in controls_data
|
||||
|
||||
def test_framework_metadata(self, controls_data):
|
||||
fw = controls_data["framework"]
|
||||
assert fw["id"] == "bp_security_v1"
|
||||
assert fw["version"] == "1.0"
|
||||
|
||||
def test_all_controls_have_open_anchors(self, controls_data):
|
||||
for ctrl in controls_data["controls"]:
|
||||
anchors = ctrl.get("open_anchors", [])
|
||||
assert len(anchors) >= 1, (
|
||||
f"Control {ctrl['control_id']} has no open anchors"
|
||||
)
|
||||
|
||||
def test_no_bsi_nomenclature_in_controls(self, controls_data):
|
||||
"""Ensure no BSI-proprietary IDs leak into product-facing fields."""
|
||||
import re
|
||||
bsi_pattern = re.compile(r"O\.[A-Za-z]+_[0-9]+")
|
||||
for ctrl in controls_data["controls"]:
|
||||
for field in ["objective", "rationale", "title"]:
|
||||
text = ctrl.get(field, "")
|
||||
match = bsi_pattern.search(text)
|
||||
assert match is None, (
|
||||
f"Control {ctrl['control_id']}.{field} contains BSI pattern: {match.group()}"
|
||||
)
|
||||
|
||||
def test_control_id_format(self, controls_data):
|
||||
import re
|
||||
pattern = re.compile(r"^[A-Z]{2,6}-[0-9]{3}$")
|
||||
for ctrl in controls_data["controls"]:
|
||||
assert pattern.match(ctrl["control_id"]), (
|
||||
f"Invalid control_id format: {ctrl['control_id']}"
|
||||
)
|
||||
|
||||
def test_valid_severities(self, controls_data):
|
||||
valid = {"low", "medium", "high", "critical"}
|
||||
for ctrl in controls_data["controls"]:
|
||||
assert ctrl["severity"] in valid, (
|
||||
f"Control {ctrl['control_id']} has invalid severity: {ctrl['severity']}"
|
||||
)
|
||||
|
||||
def test_domains_referenced_by_controls(self, controls_data):
|
||||
domain_ids = {d["id"] for d in controls_data["domains"]}
|
||||
for ctrl in controls_data["controls"]:
|
||||
assert ctrl["domain"] in domain_ids, (
|
||||
f"Control {ctrl['control_id']} references unknown domain: {ctrl['domain']}"
|
||||
)
|
||||
|
||||
def test_open_anchor_structure(self, controls_data):
|
||||
for ctrl in controls_data["controls"]:
|
||||
for i, anchor in enumerate(ctrl.get("open_anchors", [])):
|
||||
assert "framework" in anchor, (
|
||||
f"Control {ctrl['control_id']}: anchor[{i}] missing 'framework'"
|
||||
)
|
||||
assert "ref" in anchor, (
|
||||
f"Control {ctrl['control_id']}: anchor[{i}] missing 'ref'"
|
||||
)
|
||||
assert "url" in anchor, (
|
||||
f"Control {ctrl['control_id']}: anchor[{i}] missing 'url'"
|
||||
)
|
||||
assert anchor["url"].startswith("https://"), (
|
||||
f"Control {ctrl['control_id']}: anchor[{i}] URL not HTTPS"
|
||||
)
|
||||
|
||||
def test_evidence_structure(self, controls_data):
|
||||
for ctrl in controls_data["controls"]:
|
||||
for i, ev in enumerate(ctrl.get("evidence", [])):
|
||||
assert "type" in ev, (
|
||||
f"Control {ctrl['control_id']}: evidence[{i}] missing 'type'"
|
||||
)
|
||||
assert "description" in ev, (
|
||||
f"Control {ctrl['control_id']}: evidence[{i}] missing 'description'"
|
||||
)
|
||||
|
||||
def test_risk_scores_in_range(self, controls_data):
|
||||
for ctrl in controls_data["controls"]:
|
||||
if ctrl.get("risk_score") is not None:
|
||||
assert 0 <= ctrl["risk_score"] <= 10, (
|
||||
f"Control {ctrl['control_id']}: risk_score {ctrl['risk_score']} out of range"
|
||||
)
|
||||
|
||||
def test_total_controls_matches(self, controls_data):
|
||||
assert controls_data["total_controls"] == len(controls_data["controls"])
|
||||
|
||||
def test_independent_taxonomy_no_tr_reference(self, controls_data):
|
||||
"""Verify controls don't reference BSI TR documents in product text."""
|
||||
import re
|
||||
tr_pattern = re.compile(r"TR-03161|BSI-TR-")
|
||||
for ctrl in controls_data["controls"]:
|
||||
for field in ["objective", "rationale", "title"]:
|
||||
text = ctrl.get(field, "")
|
||||
match = tr_pattern.search(text)
|
||||
assert match is None, (
|
||||
f"Control {ctrl['control_id']}.{field} references BSI TR: {match.group()}"
|
||||
)
|
||||
Reference in New Issue
Block a user