diff --git a/.claude/CLAUDE.md b/.claude/CLAUDE.md index 4f4e211..16d8886 100644 --- a/.claude/CLAUDE.md +++ b/.claude/CLAUDE.md @@ -218,12 +218,35 @@ breakpilot-core/ ├── gitea/ # Gitea Config ├── docs-src/ # MkDocs Quellen ├── mkdocs.yml # MkDocs Config +├── control-pipeline/ # RAG/Control Pipeline (Port 8098) ├── scripts/ # Helper Scripts └── docker-compose.yml # Haupt-Compose (28+ Services) ``` --- +## Control Pipeline (WICHTIG) + +**Seit 2026-04-09 liegt die gesamte RAG/Control-Pipeline im Core-Repo** (`control-pipeline/`), NICHT mehr im Compliance-Repo. Alle Arbeiten an der Pipeline (Pass 0a/0b, BatchDedup, Control Generator, Enrichment) finden ausschliesslich hier statt. + +- **Port:** 8098 +- **Container:** bp-core-control-pipeline +- **DB:** Schreibt ins `compliance`-Schema der shared PostgreSQL +- **Das Compliance-Repo wird NICHT fuer Pipeline-Aenderungen benutzt** + +```bash +# Container auf Mac Mini +ssh macmini "cd ~/Projekte/breakpilot-core && /usr/local/bin/docker compose build --no-cache control-pipeline && /usr/local/bin/docker compose up -d --no-deps control-pipeline" + +# Health +ssh macmini "/usr/local/bin/docker exec bp-core-control-pipeline curl -sf http://127.0.0.1:8098/health" + +# Logs +ssh macmini "/usr/local/bin/docker logs -f bp-core-control-pipeline" +``` + +--- + ## Haeufige Befehle ### Deployment (CI/CD — Standardweg) diff --git a/control-pipeline/api/canonical_control_routes.py b/control-pipeline/api/canonical_control_routes.py index 44ec0e5..525de9d 100644 --- a/control-pipeline/api/canonical_control_routes.py +++ b/control-pipeline/api/canonical_control_routes.py @@ -13,6 +13,7 @@ Endpoints: GET /v1/canonical/controls/{control_id}/traceability — Traceability chain GET /v1/canonical/controls/{control_id}/similar — Find similar controls POST /v1/canonical/controls — Create a control + POST /v1/canonical/controls/applicable — Applicability filter (C2) PUT /v1/canonical/controls/{control_id} — Update a control DELETE /v1/canonical/controls/{control_id} — Delete a control GET /v1/canonical/categories — Category list @@ -151,6 +152,15 @@ class ControlUpdateRequest(BaseModel): scope_conditions: Optional[dict] = None +class ApplicabilityRequest(BaseModel): + """Request body for POST /v1/canonical/controls/applicable.""" + industry: Optional[str] = None + company_size: Optional[str] = None + scope_signals: Optional[list] = None + limit: int = 100 + offset: int = 0 + + class SimilarityCheckRequest(BaseModel): source_text: str candidate_text: str @@ -321,6 +331,9 @@ async def list_controls( search: Optional[str] = Query(None, description="Full-text search in control_id, title, objective"), control_type: Optional[str] = Query(None, description="Filter: atomic, rich, or all"), exclude_duplicates: bool = Query(False, description="Exclude controls with release_state='duplicate'"), + industry: Optional[str] = Query(None, description="Filter by applicable industry (e.g. Telekommunikation, Energie)"), + company_size: Optional[str] = Query(None, description="Filter by company size: micro/small/medium/large/enterprise"), + scope_signal: Optional[str] = Query(None, description="Filter by scope signal: uses_ai, third_country_transfer, etc."), sort: Optional[str] = Query("control_id", description="Sort field: control_id, created_at, severity"), order: Optional[str] = Query("asc", description="Sort order: asc or desc"), limit: Optional[int] = Query(None, ge=1, le=5000, description="Max results"), @@ -386,6 +399,22 @@ async def list_controls( query += " AND (control_id ILIKE :q OR title ILIKE :q OR objective ILIKE :q)" params["q"] = f"%{search}%" + # Scoped Control Applicability filters (C1) + if industry: + query += """ AND (applicable_industries IS NULL + OR applicable_industries LIKE '%"all"%' + OR applicable_industries LIKE '%' || :industry || '%')""" + params["industry"] = industry + if company_size: + query += """ AND (applicable_company_size IS NULL + OR applicable_company_size LIKE '%"all"%' + OR applicable_company_size LIKE '%' || :company_size || '%')""" + params["company_size"] = company_size + if scope_signal: + query += """ AND (scope_conditions IS NULL + OR scope_conditions LIKE '%' || :scope_signal || '%')""" + params["scope_signal"] = scope_signal + # Sorting sort_col = "control_id" if sort in ("created_at", "updated_at", "severity", "control_id"): @@ -425,6 +454,9 @@ async def count_controls( search: Optional[str] = Query(None), control_type: Optional[str] = Query(None), exclude_duplicates: bool = Query(False, description="Exclude controls with release_state='duplicate'"), + industry: Optional[str] = Query(None, description="Filter by applicable industry"), + company_size: Optional[str] = Query(None, description="Filter by company size: micro/small/medium/large/enterprise"), + scope_signal: Optional[str] = Query(None, description="Filter by scope signal: uses_ai, third_country_transfer, etc."), ): """Count controls matching filters (for pagination).""" query = "SELECT count(*) FROM canonical_controls WHERE 1=1" @@ -482,6 +514,22 @@ async def count_controls( query += " AND (control_id ILIKE :q OR title ILIKE :q OR objective ILIKE :q)" params["q"] = f"%{search}%" + # Scoped Control Applicability filters (C1) + if industry: + query += """ AND (applicable_industries IS NULL + OR applicable_industries LIKE '%"all"%' + OR applicable_industries LIKE '%' || :industry || '%')""" + params["industry"] = industry + if company_size: + query += """ AND (applicable_company_size IS NULL + OR applicable_company_size LIKE '%"all"%' + OR applicable_company_size LIKE '%' || :company_size || '%')""" + params["company_size"] = company_size + if scope_signal: + query += """ AND (scope_conditions IS NULL + OR scope_conditions LIKE '%' || :scope_signal || '%')""" + params["scope_signal"] = scope_signal + with SessionLocal() as db: total = db.execute(text(query), params).scalar() @@ -499,6 +547,9 @@ async def controls_meta( target_audience: Optional[str] = Query(None), source: Optional[str] = Query(None), search: Optional[str] = Query(None), + industry: Optional[str] = Query(None), + company_size: Optional[str] = Query(None), + scope_signal: Optional[str] = Query(None), control_type: Optional[str] = Query(None), exclude_duplicates: bool = Query(False), ): @@ -564,6 +615,22 @@ async def controls_meta( clauses.append("(control_id ILIKE :q OR title ILIKE :q OR objective ILIKE :q)") p["q"] = f"%{search}%" + # Scoped Control Applicability filters (C1) + if industry and skip != "industry": + clauses.append("""(applicable_industries IS NULL + OR applicable_industries LIKE '%"all"%' + OR applicable_industries LIKE '%' || :industry || '%')""") + p["industry"] = industry + if company_size and skip != "company_size": + clauses.append("""(applicable_company_size IS NULL + OR applicable_company_size LIKE '%"all"%' + OR applicable_company_size LIKE '%' || :company_size || '%')""") + p["company_size"] = company_size + if scope_signal and skip != "scope_signal": + clauses.append("""(scope_conditions IS NULL + OR scope_conditions LIKE '%' || :scope_signal || '%')""") + p["scope_signal"] = scope_signal + return " AND ".join(clauses), p with SessionLocal() as db: @@ -675,6 +742,51 @@ async def controls_meta( } +@router.post("/controls/applicable") +async def get_applicable_controls_endpoint(body: ApplicabilityRequest): + """Return controls applicable to a given company profile. + + Filters controls based on industry, company size, and scope signals. + Deterministic -- no LLM needed. Controls with NULL applicability fields + are always included (they apply to everyone). Controls with '["all"]' + match all queries. + + Request body: + - industry: e.g. "Telekommunikation", "Energie" + - company_size: e.g. "medium", "large", "enterprise" + - scope_signals: e.g. ["uses_ai", "third_country_transfer"] + - limit: max results (default 100) + - offset: pagination offset (default 0) + + Returns: + - total_applicable: count of matching controls + - controls: paginated list + - breakdown: stats by domain, severity, industry + """ + from services.applicability_engine import get_applicable_controls + + # Validate company_size + valid_sizes = {"micro", "small", "medium", "large", "enterprise"} + if body.company_size and body.company_size not in valid_sizes: + raise HTTPException( + status_code=400, + detail=f"Invalid company_size '{body.company_size}'. " + f"Must be one of: {', '.join(sorted(valid_sizes))}", + ) + + with SessionLocal() as db: + result = get_applicable_controls( + db=db, + industry=body.industry, + company_size=body.company_size, + scope_signals=body.scope_signals or [], + limit=body.limit, + offset=body.offset, + ) + + return result + + @router.get("/controls/atomic-stats") async def atomic_stats(): """Return aggregated statistics for atomic controls (masters only).""" diff --git a/control-pipeline/scripts/import_backup.py b/control-pipeline/scripts/import_backup.py new file mode 100644 index 0000000..2c8ee74 --- /dev/null +++ b/control-pipeline/scripts/import_backup.py @@ -0,0 +1,219 @@ +""" +Import compliance backup into local PostgreSQL. +Fixes Python-style lists/dicts in JSONB fields to valid JSON. +""" +import ast +import gzip +import json +import re +import sys +import psycopg2 + +DB_URL = "postgresql://breakpilot:breakpilot123@localhost:5432/breakpilot_db" +BACKUP_PATH = "/tmp/compliance-db-2026-03-28_16-25-19.sql.gz" + +# Tables with JSONB columns that need Python→JSON conversion +JSONB_TABLES = { + "canonical_controls", + "canonical_controls_pre_dedup", + "obligation_candidates", + "control_dedup_reviews", + "canonical_generation_jobs", + "canonical_processed_chunks", +} + + +def fix_python_value(val: str) -> str: + """Convert Python repr to JSON string for JSONB fields.""" + if val == "NULL": + return None + # Strip outer SQL quotes + if val.startswith("'") and val.endswith("'"): + # Unescape SQL single quotes + inner = val[1:-1].replace("''", "'") + else: + return val + + # Try to parse as Python literal and convert to JSON + try: + obj = ast.literal_eval(inner) + return json.dumps(obj, ensure_ascii=False) + except (ValueError, SyntaxError): + # Already valid JSON or plain string + return inner + + +def process_line(line: str, conn) -> bool: + """Process a single SQL line. Returns True if it was an INSERT.""" + line = line.strip() + if not line.startswith("INSERT INTO"): + if line.startswith("SET "): + return False + return False + + # Execute directly for non-JSONB tables + table_match = re.match(r'INSERT INTO "(\w+)"', line) + if not table_match: + return False + table = table_match.group(1) + + if table not in JSONB_TABLES: + # Execute as-is + try: + with conn.cursor() as cur: + cur.execute(line) + return True + except Exception as e: + conn.rollback() + return False + + # For JSONB tables: use psycopg2 parameterized query + # Extract column names and values + cols_match = re.match(r'INSERT INTO "\w+" \(([^)]+)\) VALUES \(', line) + if not cols_match: + return False + + col_names = [c.strip().strip('"') for c in cols_match.group(1).split(",")] + + # Extract VALUES portion + vals_start = line.index("VALUES (") + 8 + vals_str = line[vals_start:-2] # Remove trailing ); + + # Parse SQL values (handling nested quotes and parentheses) + values = [] + current = "" + in_quote = False + depth = 0 + i = 0 + while i < len(vals_str): + c = vals_str[i] + if in_quote: + if c == "'" and i + 1 < len(vals_str) and vals_str[i + 1] == "'": + current += "''" + i += 2 + continue + elif c == "'": + current += "'" + in_quote = False + else: + current += c + else: + if c == "'": + current += "'" + in_quote = True + elif c == "(" : + depth += 1 + current += c + elif c == ")": + depth -= 1 + current += c + elif c == "," and depth == 0: + values.append(current.strip()) + current = "" + else: + current += c + i += 1 + values.append(current.strip()) + + if len(values) != len(col_names): + # Fallback: try direct execution + try: + with conn.cursor() as cur: + cur.execute(line) + return True + except Exception: + conn.rollback() + return False + + # Convert values + params = [] + placeholders = [] + for col, val in zip(col_names, values): + if val == "NULL": + params.append(None) + placeholders.append("%s") + elif val in ("TRUE", "true"): + params.append(True) + placeholders.append("%s") + elif val in ("FALSE", "false"): + params.append(False) + placeholders.append("%s") + elif val.startswith("'") and val.endswith("'"): + inner = val[1:-1].replace("''", "'") + # Check if this looks like a Python literal (list/dict) + stripped = inner.strip() + if stripped and stripped[0] in ("[", "{") and stripped not in ("[]", "{}"): + try: + obj = ast.literal_eval(inner) + params.append(json.dumps(obj, ensure_ascii=False)) + except (ValueError, SyntaxError): + params.append(inner) + else: + params.append(inner) + placeholders.append("%s") + else: + # Numeric or other + try: + if "." in val: + params.append(float(val)) + else: + params.append(int(val)) + except ValueError: + params.append(val) + placeholders.append("%s") + + col_list = ", ".join(f'"{c}"' for c in col_names) + ph_list = ", ".join(placeholders) + sql = f'INSERT INTO "{table}" ({col_list}) VALUES ({ph_list})' + + try: + with conn.cursor() as cur: + cur.execute(sql, params) + return True + except Exception as e: + conn.rollback() + if "duplicate key" not in str(e): + print(f" ERROR [{table}]: {str(e)[:120]}", file=sys.stderr) + return False + + +def main(): + conn = psycopg2.connect(DB_URL) + conn.autocommit = True + + with conn.cursor() as cur: + cur.execute("SET search_path TO compliance, public") + + total = 0 + ok = 0 + errors = 0 + + print(f"Reading {BACKUP_PATH}...") + with gzip.open(BACKUP_PATH, "rt", encoding="utf-8") as f: + buffer = "" + for line in f: + buffer += line + if not buffer.rstrip().endswith(";"): + continue + # Complete SQL statement + stmt = buffer.strip() + buffer = "" + + if not stmt.startswith("INSERT"): + continue + + total += 1 + if process_line(stmt, conn): + ok += 1 + else: + errors += 1 + + if total % 10000 == 0: + print(f" {total:>8} processed, {ok} ok, {errors} errors") + + print(f"\nDONE: {total} total, {ok} ok, {errors} errors") + conn.close() + + +if __name__ == "__main__": + main() diff --git a/control-pipeline/services/applicability_engine.py b/control-pipeline/services/applicability_engine.py new file mode 100644 index 0000000..a5b2d5e --- /dev/null +++ b/control-pipeline/services/applicability_engine.py @@ -0,0 +1,245 @@ +""" +Applicability Engine -- filters controls based on company profile + scope answers. + +Deterministic, no LLM needed. Implements Scoped Control Applicability (Phase C2). + +Filtering logic: + - Controls with NULL applicability fields are INCLUDED (apply to everyone). + - Controls with '["all"]' match all queries. + - Industry: control applies if its applicable_industries contains the requested + industry OR contains "all" OR is NULL. + - Company size: control applies if its applicable_company_size contains the + requested size OR contains "all" OR is NULL. + - Scope signals: control applies if it has NO scope_conditions, or the company + has at least one of the required signals (requires_any logic). +""" + +from __future__ import annotations + +import json +import logging +from typing import Any, Optional + +from sqlalchemy import text + +from db.session import SessionLocal + +logger = logging.getLogger(__name__) + +# Valid company sizes (ordered smallest to largest) +VALID_SIZES = ("micro", "small", "medium", "large", "enterprise") + + +def _parse_json_text(value: Any) -> Any: + """Parse a TEXT column that stores JSON. Returns None if unparseable.""" + if value is None: + return None + if isinstance(value, (list, dict)): + return value + if isinstance(value, str): + try: + return json.loads(value) + except (json.JSONDecodeError, ValueError): + return None + return None + + +def _matches_industry(applicable_industries_raw: Any, industry: str) -> bool: + """Check if a control's applicable_industries matches the requested industry.""" + industries = _parse_json_text(applicable_industries_raw) + if industries is None: + return True # NULL = applies to everyone + if not isinstance(industries, list): + return True # malformed = include + if "all" in industries: + return True + return industry in industries + + +def _matches_company_size(applicable_company_size_raw: Any, company_size: str) -> bool: + """Check if a control's applicable_company_size matches the requested size.""" + sizes = _parse_json_text(applicable_company_size_raw) + if sizes is None: + return True # NULL = applies to everyone + if not isinstance(sizes, list): + return True # malformed = include + if "all" in sizes: + return True + return company_size in sizes + + +def _matches_scope_signals( + scope_conditions_raw: Any, scope_signals: list[str] +) -> bool: + """Check if a control's scope_conditions are satisfied by the given signals. + + A control with scope_conditions = {"requires_any": ["uses_ai", "processes_health_data"]} + matches if the company has at least one of those signals. + A control with NULL or empty scope_conditions always matches. + """ + conditions = _parse_json_text(scope_conditions_raw) + if conditions is None: + return True # no conditions = applies to everyone + if not isinstance(conditions, dict): + return True # malformed = include + + requires_any = conditions.get("requires_any", []) + if not requires_any: + return True # no required signals = applies to everyone + + # Company must have at least one of the required signals + return bool(set(requires_any) & set(scope_signals)) + + +def get_applicable_controls( + db, + industry: Optional[str] = None, + company_size: Optional[str] = None, + scope_signals: Optional[list[str]] = None, + limit: int = 100, + offset: int = 0, +) -> dict[str, Any]: + """ + Returns controls applicable to the given company profile. + + Uses SQL pre-filtering with LIKE for performance, then Python post-filtering + for precise JSON matching (since columns are TEXT, not JSONB). + + Args: + db: SQLAlchemy session + industry: e.g. "Telekommunikation", "Energie", "Gesundheitswesen" + company_size: e.g. "medium", "large", "enterprise" + scope_signals: e.g. ["uses_ai", "third_country_transfer"] + limit: max results to return (applied after filtering) + offset: pagination offset (applied after filtering) + + Returns: + dict with total_applicable count, paginated controls, and breakdown stats + """ + if scope_signals is None: + scope_signals = [] + + # SQL pre-filter: broad match to reduce Python-side filtering + query = """ + SELECT id, framework_id, control_id, title, objective, rationale, + scope, requirements, test_procedure, evidence, + severity, risk_score, implementation_effort, + evidence_confidence, open_anchors, release_state, tags, + license_rule, source_original_text, source_citation, + customer_visible, verification_method, category, evidence_type, + target_audience, generation_metadata, generation_strategy, + applicable_industries, applicable_company_size, scope_conditions, + parent_control_uuid, decomposition_method, pipeline_version, + created_at, updated_at + FROM canonical_controls + WHERE release_state NOT IN ('duplicate', 'deprecated', 'rejected') + """ + params: dict[str, Any] = {} + + # SQL-level pre-filtering (broad, may include false positives) + if industry: + query += """ AND (applicable_industries IS NULL + OR applicable_industries LIKE '%"all"%' + OR applicable_industries LIKE '%' || :industry || '%')""" + params["industry"] = industry + + if company_size: + query += """ AND (applicable_company_size IS NULL + OR applicable_company_size LIKE '%"all"%' + OR applicable_company_size LIKE '%' || :company_size || '%')""" + params["company_size"] = company_size + + # For scope_signals we cannot do precise SQL filtering on requires_any, + # but we can at least exclude controls whose scope_conditions text + # does not contain any of the requested signals (if only 1 signal). + # With multiple signals we skip SQL pre-filter and do it in Python. + if scope_signals and len(scope_signals) == 1: + query += """ AND (scope_conditions IS NULL + OR scope_conditions LIKE '%' || :scope_sig || '%')""" + params["scope_sig"] = scope_signals[0] + + query += " ORDER BY control_id" + + rows = db.execute(text(query), params).fetchall() + + # Python-level precise filtering + applicable = [] + for r in rows: + if industry and not _matches_industry(r.applicable_industries, industry): + continue + if company_size and not _matches_company_size( + r.applicable_company_size, company_size + ): + continue + if scope_signals and not _matches_scope_signals( + r.scope_conditions, scope_signals + ): + continue + applicable.append(r) + + total_applicable = len(applicable) + + # Apply pagination + paginated = applicable[offset : offset + limit] + + # Build domain breakdown + domain_counts: dict[str, int] = {} + for r in applicable: + domain = r.control_id.split("-")[0].upper() if r.control_id else "UNKNOWN" + domain_counts[domain] = domain_counts.get(domain, 0) + 1 + + # Build severity breakdown + severity_counts: dict[str, int] = {} + for r in applicable: + sev = r.severity or "unknown" + severity_counts[sev] = severity_counts.get(sev, 0) + 1 + + # Build industry breakdown (from matched controls) + industry_counts: dict[str, int] = {} + for r in applicable: + industries = _parse_json_text(r.applicable_industries) + if isinstance(industries, list): + for ind in industries: + industry_counts[ind] = industry_counts.get(ind, 0) + 1 + else: + industry_counts["unclassified"] = ( + industry_counts.get("unclassified", 0) + 1 + ) + + return { + "total_applicable": total_applicable, + "limit": limit, + "offset": offset, + "controls": [_row_to_control(r) for r in paginated], + "breakdown": { + "by_domain": domain_counts, + "by_severity": severity_counts, + "by_industry": industry_counts, + }, + } + + +def _row_to_control(r) -> dict[str, Any]: + """Convert a DB row to a control dict for API response.""" + return { + "id": str(r.id), + "framework_id": str(r.framework_id), + "control_id": r.control_id, + "title": r.title, + "objective": r.objective, + "rationale": r.rationale, + "severity": r.severity, + "category": r.category, + "verification_method": r.verification_method, + "evidence_type": getattr(r, "evidence_type", None), + "target_audience": r.target_audience, + "applicable_industries": r.applicable_industries, + "applicable_company_size": r.applicable_company_size, + "scope_conditions": r.scope_conditions, + "release_state": r.release_state, + "control_id_domain": ( + r.control_id.split("-")[0].upper() if r.control_id else None + ), + "created_at": r.created_at.isoformat() if r.created_at else None, + "updated_at": r.updated_at.isoformat() if r.updated_at else None, + } diff --git a/control-pipeline/tests/test_applicability_engine.py b/control-pipeline/tests/test_applicability_engine.py new file mode 100644 index 0000000..5b8caa2 --- /dev/null +++ b/control-pipeline/tests/test_applicability_engine.py @@ -0,0 +1,229 @@ +""" +Tests for the Applicability Engine (Phase C2). + +Tests the deterministic filtering logic for industry, company size, +and scope signals without requiring a database connection. +""" + +import pytest + +from services.applicability_engine import ( + _matches_company_size, + _matches_industry, + _matches_scope_signals, + _parse_json_text, +) + + +# ============================================================================= +# _parse_json_text +# ============================================================================= + + +class TestParseJsonText: + def test_none_returns_none(self): + assert _parse_json_text(None) is None + + def test_valid_json_list(self): + assert _parse_json_text('["all"]') == ["all"] + + def test_valid_json_list_multiple(self): + result = _parse_json_text('["Telekommunikation", "Energie"]') + assert result == ["Telekommunikation", "Energie"] + + def test_valid_json_dict(self): + result = _parse_json_text('{"requires_any": ["uses_ai"]}') + assert result == {"requires_any": ["uses_ai"]} + + def test_invalid_json_returns_none(self): + assert _parse_json_text("not json") is None + + def test_empty_string_returns_none(self): + assert _parse_json_text("") is None + + def test_already_list_passthrough(self): + val = ["all"] + assert _parse_json_text(val) == ["all"] + + def test_already_dict_passthrough(self): + val = {"requires_any": ["uses_ai"]} + assert _parse_json_text(val) == val + + def test_integer_returns_none(self): + assert _parse_json_text(42) is None + + +# ============================================================================= +# _matches_industry +# ============================================================================= + + +class TestMatchesIndustry: + def test_null_matches_any_industry(self): + assert _matches_industry(None, "Telekommunikation") is True + + def test_all_matches_any_industry(self): + assert _matches_industry('["all"]', "Telekommunikation") is True + assert _matches_industry('["all"]', "Energie") is True + + def test_specific_industry_matches(self): + assert _matches_industry( + '["Telekommunikation", "Energie"]', "Telekommunikation" + ) is True + + def test_specific_industry_no_match(self): + assert _matches_industry( + '["Telekommunikation", "Energie"]', "Gesundheitswesen" + ) is False + + def test_malformed_json_matches(self): + """Malformed data should be treated as 'applies to everyone'.""" + assert _matches_industry("not json", "anything") is True + + def test_all_with_other_industries(self): + assert _matches_industry( + '["all", "Telekommunikation"]', "Gesundheitswesen" + ) is True + + +# ============================================================================= +# _matches_company_size +# ============================================================================= + + +class TestMatchesCompanySize: + def test_null_matches_any_size(self): + assert _matches_company_size(None, "medium") is True + + def test_all_matches_any_size(self): + assert _matches_company_size('["all"]', "micro") is True + assert _matches_company_size('["all"]', "enterprise") is True + + def test_specific_size_matches(self): + assert _matches_company_size( + '["medium", "large", "enterprise"]', "large" + ) is True + + def test_specific_size_no_match(self): + assert _matches_company_size( + '["medium", "large", "enterprise"]', "small" + ) is False + + def test_micro_excluded_from_nis2(self): + """NIS2 typically requires medium+.""" + assert _matches_company_size( + '["medium", "large", "enterprise"]', "micro" + ) is False + + def test_malformed_json_matches(self): + assert _matches_company_size("broken", "medium") is True + + +# ============================================================================= +# _matches_scope_signals +# ============================================================================= + + +class TestMatchesScopeSignals: + def test_null_conditions_always_match(self): + assert _matches_scope_signals(None, ["uses_ai"]) is True + assert _matches_scope_signals(None, []) is True + + def test_empty_requires_any_matches(self): + assert _matches_scope_signals('{"requires_any": []}', ["uses_ai"]) is True + + def test_no_requires_any_key_matches(self): + assert _matches_scope_signals( + '{"description": "some text"}', ["uses_ai"] + ) is True + + def test_requires_any_with_matching_signal(self): + conditions = '{"requires_any": ["uses_ai"], "description": "AI Act"}' + assert _matches_scope_signals(conditions, ["uses_ai"]) is True + + def test_requires_any_with_no_matching_signal(self): + conditions = '{"requires_any": ["uses_ai"], "description": "AI Act"}' + assert _matches_scope_signals( + conditions, ["third_country_transfer"] + ) is False + + def test_requires_any_with_one_of_multiple_matching(self): + conditions = '{"requires_any": ["uses_ai", "processes_health_data"]}' + assert _matches_scope_signals( + conditions, ["processes_health_data", "financial_data"] + ) is True + + def test_requires_any_with_no_signals_provided(self): + conditions = '{"requires_any": ["uses_ai"]}' + assert _matches_scope_signals(conditions, []) is False + + def test_malformed_json_matches(self): + assert _matches_scope_signals("broken", ["uses_ai"]) is True + + def test_multiple_required_signals_any_match(self): + """requires_any means at least ONE must match.""" + conditions = ( + '{"requires_any": ["uses_ai", "third_country_transfer", ' + '"processes_health_data"]}' + ) + assert _matches_scope_signals( + conditions, ["third_country_transfer"] + ) is True + + def test_multiple_required_signals_none_match(self): + conditions = ( + '{"requires_any": ["uses_ai", "third_country_transfer"]}' + ) + assert _matches_scope_signals( + conditions, ["financial_data", "employee_monitoring"] + ) is False + + +# ============================================================================= +# Integration-style: combined filtering scenarios +# ============================================================================= + + +class TestCombinedFiltering: + """Test typical real-world filtering scenarios.""" + + def test_dsgvo_art5_applies_to_everyone(self): + """DSGVO Art. 5 = all industries, all sizes, no scope conditions.""" + assert _matches_industry('["all"]', "Telekommunikation") is True + assert _matches_company_size('["all"]', "micro") is True + assert _matches_scope_signals(None, []) is True + + def test_nis2_art21_kritis_medium_plus(self): + """NIS2 Art. 21 = KRITIS sectors, medium+.""" + industries = '["Energie", "Gesundheitswesen", "Digitale Infrastruktur", "Logistik / Transport"]' + sizes = '["medium", "large", "enterprise"]' + + # Matches: Energie + large + assert _matches_industry(industries, "Energie") is True + assert _matches_company_size(sizes, "large") is True + + # No match: IT company + assert _matches_industry(industries, "Technologie / IT") is False + + # No match: small company + assert _matches_company_size(sizes, "small") is False + + def test_ai_act_scope_condition(self): + """AI Act = all industries, all sizes, but only if uses_ai.""" + conditions = '{"requires_any": ["uses_ai"], "description": "Nur bei KI-Einsatz"}' + + # Company uses AI + assert _matches_scope_signals(conditions, ["uses_ai"]) is True + + # Company does not use AI + assert _matches_scope_signals(conditions, []) is False + assert _matches_scope_signals( + conditions, ["third_country_transfer"] + ) is False + + def test_tkg_telekom_only(self): + """TKG = only Telekommunikation, all sizes.""" + industries = '["Telekommunikation"]' + + assert _matches_industry(industries, "Telekommunikation") is True + assert _matches_industry(industries, "Energie") is False diff --git a/docker-compose.yml b/docker-compose.yml index e934de1..e536c02 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -56,7 +56,7 @@ services: - "8091:8091" # Voice Service (WSS) - "8093:8093" # AI Compliance SDK - "8097:8097" # RAG Service (NEU) - - "8098:8098" # Control Pipeline + #- "8098:8098" # Control Pipeline (intern only, kein Nginx-Port noetig) - "8443:8443" # Jitsi Meet - "3008:3008" # Admin Core - "3010:3010" # Portal Dashboard