Compare commits

..

2 Commits

Author SHA1 Message Date
Benjamin Admin 1451873194 fix(audit): parse_flat_cookie_text fuer VW-Style Flat-Tabellen
CI / loc-budget (push) Failing after 19s
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / nodejs-build (push) Successful in 3m4s
CI / test-go (push) Has been skipped
CI / iace-gt-coverage (push) Has been skipped
CI / test-python-backend (push) Successful in 43s
CI / test-python-document-crawler (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped
CI / detect-changes (push) Successful in 12s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / secret-scan (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / validate-canonical-controls (push) Successful in 19s
VW Cookie-Doc liefert die Tabelle als FLACHEN Text ohne Spalten-Trenner:
'IDE Tracking Cookies (Marketing) Beschreibung 13 Monate Permanent
TAID Tracking Cookies (Marketing) ...'

parse_flat_cookie_text matched mit Regex:
  NAME [Tracking|Session|Funktional|...] Cookies ... [13 Monate|Session|Permanent]

Backend faellt bei parse_cookie_table=[] auf parse_flat zurueck. Damit
holen wir aus dem 65k VW Cookie-Doc ~30-50 Cookies + Vendors deterministisch,
auch wenn der HTML-Table-DOM-Extract leer ist (was passiert wenn die
Tabelle aus mehreren append-Code-Pfaden geladen wird).

Bonus: _extract_dom_tables Helper in dsi_discovery.py vorbereitet fuer
spaeteres Einhaengen an allen 7 DiscoveredDSI.append-Stellen.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 21:24:14 +02:00
Benjamin Admin dfac940272 feat(licenses): attribution renderer — Stufe 1 (overview) + Stufe 3 (SourceBadge)
Backend
- backend-compliance/compliance/api/licenses_routes.py: three endpoints
  built on the now-complete license_rule classification
  - GET  /api/compliance/licenses/overview
       global aggregation by rule + per-source breakdown (Stufe 1)
  - POST /api/compliance/licenses/aggregate
       per-control-set aggregation for PDF footer (Stufe 2) and
       tech-file appendix (Stufe 4) — consumed later
  - GET  /api/compliance/licenses/source-info/{control_uuid}
       single-control lookup for the inline source badge (Stufe 3)
- registered in api/__init__.py via the existing safe-import loader

Frontend
- app/sdk/licenses/page.tsx (Stufe 1): the /sdk/licenses overview page.
  Renders rule legend cards + per-rule source tables. Drives the
  /licenses footer link and gives auditors a one-page view of what
  licence classes the platform is operating under.
- components/sdk/SourceBadge.tsx (Stufe 3): reusable React component.
  Small R1/R2/R3 pill with click-expand tooltip showing source
  regulation + attribution string + render-full-text policy. Will be
  embedded into IACE hazards/mitigations, VVT items, DSFA controls in
  follow-up commits.

Two stages of the four-stage renderer are now ready. Stufe 2 (PDF
auto-footer) + Stufe 4 (tech-file appendix) follow once the existing
PDF generators are extended to call /licenses/aggregate.
2026-05-21 21:00:10 +02:00
7 changed files with 709 additions and 4 deletions
+160
View File
@@ -0,0 +1,160 @@
'use client'
import { useEffect, useState } from 'react'
// Stufe 1 of the Attribution Renderer (Task #23): the global
// "Quellen & Lizenzen" overview. Aggregates all 314k canonical_controls
// by their license_rule and shows the source regulations behind each
// bucket. Drives the footer link and gives auditors a one-page view of
// what licence classes the platform is operating under.
type SourceCount = {
regulation_id: string
regulation_name_de: string | null
license_rule: number
license_type: string | null
attribution: string | null
jurisdiction: string | null
source_type: string | null
n_controls: number
}
type RuleBucket = {
rule: number
label_de: string
label_en: string
attribution_required: boolean
render_full_text: boolean
total_controls: number
distinct_sources: number
sources: SourceCount[]
}
type Overview = {
total_controls: number
buckets: RuleBucket[]
}
const RULE_COLOR: Record<number, string> = {
1: 'border-emerald-200 bg-emerald-50',
2: 'border-amber-200 bg-amber-50',
3: 'border-slate-200 bg-slate-50',
}
const RULE_BADGE: Record<number, string> = {
1: 'bg-emerald-600 text-white',
2: 'bg-amber-600 text-white',
3: 'bg-slate-600 text-white',
}
export default function LicensesPage() {
const [data, setData] = useState<Overview | null>(null)
const [error, setError] = useState<string | null>(null)
useEffect(() => {
fetch('/api/sdk/v1/compliance/licenses/overview')
.then((r) => (r.ok ? r.json() : Promise.reject(`HTTP ${r.status}`)))
.then(setData)
.catch((e) => setError(String(e)))
}, [])
if (error) {
return (
<div className="p-6">
<h1 className="text-xl font-semibold mb-2">Quellen &amp; Lizenzen</h1>
<p className="text-red-600">Fehler beim Laden: {error}</p>
</div>
)
}
if (!data) {
return (
<div className="p-6">
<h1 className="text-xl font-semibold">Quellen &amp; Lizenzen</h1>
<p className="text-slate-500 mt-2">Lade </p>
</div>
)
}
return (
<div className="p-6 max-w-7xl">
<header className="mb-6">
<h1 className="text-2xl font-semibold">Quellen &amp; Lizenzen</h1>
<p className="text-sm text-slate-600 mt-1">
Diese Plattform stützt sich auf {data.total_controls.toLocaleString('de-DE')}{' '}
klassifizierte Compliance-Controls aus den unten genannten Quellen.
Jeder Control trägt eine deterministische Lizenzregel (R1R3), die das
Render-Verhalten in Berichten und im Frontend steuert.
</p>
</header>
<section className="mb-8">
<h2 className="text-lg font-medium mb-3">Klassifizierungs-Schema</h2>
<div className="grid grid-cols-1 md:grid-cols-3 gap-3 text-sm">
{data.buckets.map((b) => (
<div key={b.rule} className={`rounded border ${RULE_COLOR[b.rule] ?? 'border-slate-200'} p-3`}>
<div className="flex items-center gap-2 mb-2">
<span className={`inline-flex items-center justify-center w-7 h-7 rounded-full text-xs font-bold ${RULE_BADGE[b.rule] ?? 'bg-slate-600 text-white'}`}>
R{b.rule}
</span>
<span className="font-medium">{b.label_de}</span>
</div>
<ul className="text-xs text-slate-700 space-y-1">
<li>{b.total_controls.toLocaleString('de-DE')} Controls</li>
<li>{b.distinct_sources} Quellen</li>
<li>{b.render_full_text ? 'Volltext-Anzeige erlaubt' : 'Nur Identifier-Verweis'}</li>
<li>{b.attribution_required ? 'Attribution-Pflicht in Output' : 'keine Attribution-Pflicht'}</li>
</ul>
</div>
))}
</div>
</section>
{data.buckets.map((b) => (
<section key={b.rule} className="mb-8">
<h2 className="text-lg font-medium mb-3 flex items-center gap-2">
<span className={`inline-flex items-center justify-center w-7 h-7 rounded-full text-xs font-bold ${RULE_BADGE[b.rule] ?? 'bg-slate-600 text-white'}`}>
R{b.rule}
</span>
{b.label_de}{' '}
<span className="text-sm text-slate-500 font-normal">
({b.total_controls.toLocaleString('de-DE')} Controls aus {b.distinct_sources} Quellen)
</span>
</h2>
<div className="overflow-x-auto border rounded">
<table className="w-full text-sm">
<thead className="bg-slate-100 text-slate-700">
<tr>
<th className="text-left p-2">Quelle</th>
<th className="text-left p-2">Lizenztyp</th>
<th className="text-left p-2">Rechtsraum</th>
<th className="text-left p-2">Attribution</th>
<th className="text-right p-2">Controls</th>
</tr>
</thead>
<tbody>
{b.sources.map((s) => (
<tr key={`${b.rule}-${s.regulation_id}`} className="border-t">
<td className="p-2">{s.regulation_name_de ?? s.regulation_id}</td>
<td className="p-2 text-slate-600">{s.license_type ?? '—'}</td>
<td className="p-2 text-slate-600">{s.jurisdiction ?? '—'}</td>
<td className="p-2 text-slate-600">{s.attribution ?? '—'}</td>
<td className="p-2 text-right tabular-nums">{s.n_controls.toLocaleString('de-DE')}</td>
</tr>
))}
</tbody>
</table>
</div>
</section>
))}
<footer className="text-xs text-slate-500 border-t pt-4 mt-8">
Klassifizierung: deterministisch über parent_control_uuid-Vererbung,
control_parent_links regulation_registry, source_citation,
canonical_processed_chunks (Pipeline-Ground-Truth) und LLM-Aggregat-
Identifikation für eigene Werke. Audit-Skripte unter
breakpilot-core/control-pipeline/scripts/.
</footer>
</div>
)
}
@@ -0,0 +1,138 @@
'use client'
import { useEffect, useState } from 'react'
// Stufe 3 of the Attribution Renderer (Task #23): an inline source
// badge that any rendered control/hazard/measure can attach to itself.
//
// Visually a small license-rule pill (R1/R2/R3); on hover/click it
// reveals the underlying regulation, license type, and — for Rule 2 —
// the mandatory attribution string.
//
// Usage:
// <SourceBadge controlUuid={hazard.id} />
//
// The component lazily fetches /licenses/source-info/{uuid} on first
// expand so the surrounding list view stays cheap.
type SourceInfo = {
control_uuid: string
license_rule: number | null
license_label_de: string | null
attribution_required: boolean
render_full_text: boolean
regulation_id: string | null
regulation_name_de: string | null
license_type: string | null
attribution: string | null
source_url: string | null
}
const RULE_BADGE: Record<number, string> = {
1: 'bg-emerald-100 text-emerald-800 border-emerald-300',
2: 'bg-amber-100 text-amber-800 border-amber-300',
3: 'bg-slate-100 text-slate-700 border-slate-300',
}
const RULE_TITLE: Record<number, string> = {
1: 'R1 — wörtlich übernehmbar',
2: 'R2 — wörtlich mit Attribution',
3: 'R3 — nur Identifier zitieren',
}
interface SourceBadgeProps {
controlUuid: string
/** Optional: skip the fetch and render from already-known data. */
prefetched?: SourceInfo
/** Compact mode for tight UI rows (smaller pill). */
compact?: boolean
}
export function SourceBadge({ controlUuid, prefetched, compact }: SourceBadgeProps) {
const [data, setData] = useState<SourceInfo | null>(prefetched ?? null)
const [open, setOpen] = useState(false)
const [loading, setLoading] = useState(false)
const [error, setError] = useState<string | null>(null)
useEffect(() => {
if (!open || data) return
setLoading(true)
fetch(`/api/sdk/v1/compliance/licenses/source-info/${controlUuid}`)
.then((r) => (r.ok ? r.json() : Promise.reject(`HTTP ${r.status}`)))
.then(setData)
.catch((e) => setError(String(e)))
.finally(() => setLoading(false))
}, [open, data, controlUuid])
const rule = data?.license_rule ?? prefetched?.license_rule ?? null
const badgeClass = rule ? RULE_BADGE[rule] ?? RULE_BADGE[3] : 'bg-slate-100 text-slate-500 border-slate-200'
const sizeClass = compact ? 'text-[10px] px-1.5 py-0.5' : 'text-xs px-2 py-0.5'
return (
<span className="relative inline-block">
<button
type="button"
onClick={() => setOpen((v) => !v)}
className={`inline-flex items-center gap-1 rounded border font-medium ${sizeClass} ${badgeClass} hover:opacity-80 transition`}
title={rule ? RULE_TITLE[rule] : 'Lizenz unbekannt'}
aria-expanded={open}
>
<svg width="10" height="10" viewBox="0 0 16 16" fill="currentColor" aria-hidden>
<path d="M8 0a8 8 0 1 0 0 16A8 8 0 0 0 8 0Zm0 4.5a1 1 0 1 1 0 2 1 1 0 0 1 0-2ZM7 8h2v4.5H7V8Z" />
</svg>
{rule ? `R${rule}` : '?'}
</button>
{open && (
<div className="absolute left-0 mt-1 z-40 w-80 rounded-md border border-slate-200 bg-white shadow-lg p-3 text-xs">
{loading && <p className="text-slate-500">Lade Quellen-Info</p>}
{error && <p className="text-red-600">Fehler: {error}</p>}
{data && (
<div className="space-y-2">
<div className="font-semibold text-slate-800">
{data.license_label_de ?? 'Lizenz unbekannt'}
</div>
{data.regulation_name_de && (
<div>
<span className="text-slate-500">Quelle:</span>{' '}
<span className="text-slate-800">{data.regulation_name_de}</span>
</div>
)}
{data.license_type && (
<div>
<span className="text-slate-500">Lizenztyp:</span>{' '}
<span className="text-slate-700">{data.license_type}</span>
</div>
)}
{data.attribution && (
<div className="rounded bg-amber-50 border border-amber-200 px-2 py-1.5">
<div className="text-[10px] font-semibold text-amber-800 uppercase tracking-wide">
Attribution-Pflicht
</div>
<div className="text-amber-900">{data.attribution}</div>
</div>
)}
{!data.render_full_text && (
<div className="text-[10px] text-slate-500 italic">
Volltext wird im Output nicht gerendert nur Identifier-Verweis.
</div>
)}
{data.source_url && (
<a
href={data.source_url}
target="_blank"
rel="noopener noreferrer"
className="inline-block text-[10px] text-blue-600 hover:underline mt-1"
>
Originalquelle öffnen
</a>
)}
</div>
)}
</div>
)}
</span>
)
}
export default SourceBadge
@@ -72,6 +72,7 @@ _ROUTER_MODULES = [
"whistleblower_routes", "whistleblower_routes",
"tcf_routes", "tcf_routes",
"founding_wizard_routes", "founding_wizard_routes",
"licenses_routes",
] ]
_loaded_count = 0 _loaded_count = 0
@@ -862,16 +862,19 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest):
except Exception as e: except Exception as e:
logger.warning("html_table parse failed: %s", e) logger.warning("html_table parse failed: %s", e)
# B — cookies_table_parser auch auf gecrawltem Cookie-Text # B — cookies_table_parser auch auf gecrawltem Cookie-Text.
# (nicht nur bei User-Paste). Wenn der Crawler Tab/Pipe- # Erst Standard-Parse (Tab/Pipe-getrennt). Wenn der nichts
# getrennte Tabellen-Reihen erhalten hat, parsen wir sie # findet (kein Separator), Flat-Pattern-Parse fuer Sites wie
# deterministisch und mergen die Vendor-Records. # VW die ihre Tabelle als flachen Text liefern.
if cookie_text and len(cookie_text) >= 500: if cookie_text and len(cookie_text) >= 500:
try: try:
from compliance.services.cookies_table_parser import ( from compliance.services.cookies_table_parser import (
parse_cookie_table as _parse_ct, parse_cookie_table as _parse_ct,
parse_flat_cookie_text as _parse_flat,
) )
crawled_table_vendors = _parse_ct(cookie_text) crawled_table_vendors = _parse_ct(cookie_text)
if not crawled_table_vendors:
crawled_table_vendors = _parse_flat(cookie_text)
if crawled_table_vendors: if crawled_table_vendors:
existing = {(v.get("name") or "").strip().lower() existing = {(v.get("name") or "").strip().lower()
for v in cmp_vendors} for v in cmp_vendors}
@@ -0,0 +1,306 @@
"""License attribution endpoints — Task #23 Stufe 1-4.
The audit (Task #22) classified all 314,811 canonical_controls into
license_rule 1/2/3. The frontend, PDF renderer, and tech-file generator
now need to surface that classification in the form of:
- Stufe 1: a global /licenses overview page
- Stufe 2: an auto-footer in every exported PDF
- Stufe 3: an inline source badge on every rendered hazard/measure
- Stufe 4: a sources appendix in tech-file bundles
This module exposes three endpoints that all four stages consume:
GET /api/compliance/licenses/overview
Global aggregation by rule + per-source counts. Drives Stufe 1.
POST /api/compliance/licenses/aggregate
Body: {"control_uuids": ["uuid1", ...]}.
Returns per-rule grouping with source breakdown. Used by PDF
footer (Stufe 2) and tech-file appendix (Stufe 4) to build the
"sources used in this document" list.
GET /api/compliance/licenses/source-info/{control_uuid}
Single-control lookup for the inline source badge tooltip
(Stufe 3). Returns rule, source regulation, attribution text.
Why a new module instead of extending canonical_control_routes:
- canonical_control_routes serves the legacy SPDX-style license matrix
(canonical_control_licenses + canonical_control_sources, ~10 rows).
- This module is built on regulation_registry (252 rows) + the
license_rule on each control. Both schemas coexist; this module
doesn't disturb the legacy endpoints.
"""
from __future__ import annotations
import logging
from typing import Any, Optional
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from sqlalchemy import text
from sqlalchemy.orm import Session
from classroom_engine.database import get_db
router = APIRouter(prefix="/licenses", tags=["licenses"])
logger = logging.getLogger(__name__)
# ============================================================================
# Rule labels — used by frontend renderer
# ============================================================================
RULE_LABELS = {
1: {
"code": "R1",
"label_de": "Wörtlich übernehmbar",
"label_en": "Verbatim, no attribution required",
"render_full_text": True,
"attribution_required": False,
},
2: {
"code": "R2",
"label_de": "Wörtlich mit Attribution",
"label_en": "Verbatim with attribution",
"render_full_text": True,
"attribution_required": True,
},
3: {
"code": "R3",
"label_de": "Nur Identifier zitieren",
"label_en": "Identifier citation only",
"render_full_text": False,
"attribution_required": False,
},
}
# ============================================================================
# Response Schemas
# ============================================================================
class SourceCount(BaseModel):
regulation_id: str
regulation_name_de: Optional[str]
license_rule: int
license_type: Optional[str]
attribution: Optional[str]
jurisdiction: Optional[str]
source_type: Optional[str]
n_controls: int
class RuleBucket(BaseModel):
rule: int
label_de: str
label_en: str
attribution_required: bool
render_full_text: bool
total_controls: int
distinct_sources: int
sources: list[SourceCount]
class OverviewResponse(BaseModel):
total_controls: int
buckets: list[RuleBucket]
class AggregateRequest(BaseModel):
control_uuids: list[UUID]
class AggregateResponse(BaseModel):
total_in_request: int
matched: int
buckets: list[RuleBucket]
class SourceInfo(BaseModel):
control_uuid: UUID
license_rule: Optional[int]
license_label_de: Optional[str]
attribution_required: bool
render_full_text: bool
regulation_id: Optional[str]
regulation_name_de: Optional[str]
license_type: Optional[str]
attribution: Optional[str]
source_url: Optional[str]
# ============================================================================
# Endpoints
# ============================================================================
def _bucket(rule: int, sources: list[SourceCount]) -> RuleBucket:
meta = RULE_LABELS.get(rule, RULE_LABELS[3])
return RuleBucket(
rule=rule,
label_de=meta["label_de"],
label_en=meta["label_en"],
attribution_required=meta["attribution_required"],
render_full_text=meta["render_full_text"],
total_controls=sum(s.n_controls for s in sources),
distinct_sources=len(sources),
sources=sources,
)
@router.get("/overview", response_model=OverviewResponse)
def licenses_overview(db: Session = Depends(get_db)) -> OverviewResponse:
"""Global aggregation: total controls by rule, with per-source breakdown.
Drives Stufe 1 (the /licenses page).
"""
rows = db.execute(text("""
SELECT
COALESCE(cpl.source_regulation, '(no source)') AS regulation_name,
cc.license_rule,
COUNT(DISTINCT cc.id) AS n
FROM compliance.canonical_controls cc
LEFT JOIN compliance.control_parent_links cpl ON cpl.control_uuid = cc.id
WHERE cc.license_rule IS NOT NULL
GROUP BY 1, 2
""")).fetchall()
reg_rows = db.execute(text("""
SELECT regulation_name_de, regulation_id, license_type, attribution,
jurisdiction, source_type
FROM compliance.regulation_registry
""")).fetchall()
reg_by_name = {r.regulation_name_de: r for r in reg_rows if r.regulation_name_de}
by_rule: dict[int, list[SourceCount]] = {1: [], 2: [], 3: []}
seen: dict[tuple[int, str], int] = {}
total = 0
for row in rows:
rule = int(row.license_rule)
name = row.regulation_name
n = int(row.n)
key = (rule, name)
# multiple cpl entries per control deduplicate via DISTINCT, but a
# control with several source_regulations still gets counted once
# per regulation — that's the design.
seen[key] = seen.get(key, 0) + n
total += n
for (rule, name), n in seen.items():
reg = reg_by_name.get(name)
by_rule.setdefault(rule, []).append(SourceCount(
regulation_id=reg.regulation_id if reg else name,
regulation_name_de=name,
license_rule=rule,
license_type=reg.license_type if reg else None,
attribution=reg.attribution if reg else None,
jurisdiction=reg.jurisdiction if reg else None,
source_type=reg.source_type if reg else None,
n_controls=n,
))
for r in by_rule.values():
r.sort(key=lambda s: -s.n_controls)
buckets = [_bucket(rule, sources) for rule, sources in sorted(by_rule.items())]
return OverviewResponse(total_controls=total, buckets=buckets)
@router.post("/aggregate", response_model=AggregateResponse)
def aggregate_for_controls(
body: AggregateRequest,
db: Session = Depends(get_db),
) -> AggregateResponse:
"""Per-control license aggregation for PDF footer (Stufe 2) and
tech-file sources appendix (Stufe 4).
Returns a per-rule breakdown of which sources contributed to the
supplied control set. The frontend renderer turns this into the
"Verwendete Quellen" footer.
"""
if not body.control_uuids:
return AggregateResponse(total_in_request=0, matched=0, buckets=[])
rows = db.execute(text("""
SELECT
COALESCE(cpl.source_regulation, '(unknown)') AS regulation_name,
cc.license_rule,
COUNT(DISTINCT cc.id) AS n
FROM compliance.canonical_controls cc
LEFT JOIN compliance.control_parent_links cpl ON cpl.control_uuid = cc.id
WHERE cc.id = ANY(:ids) AND cc.license_rule IS NOT NULL
GROUP BY 1, 2
"""), {"ids": [str(u) for u in body.control_uuids]}).fetchall()
reg_rows = db.execute(text("""
SELECT regulation_name_de, regulation_id, license_type, attribution,
jurisdiction, source_type
FROM compliance.regulation_registry
""")).fetchall()
reg_by_name = {r.regulation_name_de: r for r in reg_rows if r.regulation_name_de}
by_rule: dict[int, list[SourceCount]] = {1: [], 2: [], 3: []}
matched_total = 0
for row in rows:
rule = int(row.license_rule)
n = int(row.n)
matched_total += n
reg = reg_by_name.get(row.regulation_name)
by_rule.setdefault(rule, []).append(SourceCount(
regulation_id=reg.regulation_id if reg else row.regulation_name,
regulation_name_de=row.regulation_name,
license_rule=rule,
license_type=reg.license_type if reg else None,
attribution=reg.attribution if reg else None,
jurisdiction=reg.jurisdiction if reg else None,
source_type=reg.source_type if reg else None,
n_controls=n,
))
for r in by_rule.values():
r.sort(key=lambda s: -s.n_controls)
buckets = [_bucket(rule, sources) for rule, sources in sorted(by_rule.items()) if sources]
return AggregateResponse(
total_in_request=len(body.control_uuids),
matched=matched_total,
buckets=buckets,
)
@router.get("/source-info/{control_uuid}", response_model=SourceInfo)
def source_info_for_control(
control_uuid: UUID,
db: Session = Depends(get_db),
) -> SourceInfo:
"""Single-control source info for the inline source badge (Stufe 3).
Used by the React `<SourceBadge>` component to populate its tooltip.
"""
row = db.execute(text("""
SELECT cc.license_rule, cpl.source_regulation AS regulation_name,
r.regulation_id, r.license_type, r.attribution, r.url AS source_url
FROM compliance.canonical_controls cc
LEFT JOIN compliance.control_parent_links cpl ON cpl.control_uuid = cc.id
LEFT JOIN compliance.regulation_registry r ON r.regulation_name_de = cpl.source_regulation
WHERE cc.id = :uuid
LIMIT 1
"""), {"uuid": str(control_uuid)}).fetchone()
if row is None:
raise HTTPException(status_code=404, detail="control not found")
rule = int(row.license_rule) if row.license_rule is not None else None
meta = RULE_LABELS.get(rule, {}) if rule else {}
return SourceInfo(
control_uuid=control_uuid,
license_rule=rule,
license_label_de=meta.get("label_de"),
attribution_required=meta.get("attribution_required", False),
render_full_text=meta.get("render_full_text", False),
regulation_id=row.regulation_id,
regulation_name_de=row.regulation_name,
license_type=row.license_type,
attribution=row.attribution,
source_url=row.source_url,
)
@@ -189,6 +189,74 @@ def parse_cookie_table(text: str) -> list[dict]:
return out return out
_FLAT_ROW_RE = re.compile(
r"\b([A-Za-z_][A-Za-z0-9_\-\.]{1,40})\s+"
r"((?:Tracking|Session|Funktional|Marketing|Analytics|Performance|"
r"Notwendig|Strictly\s+Necessary|Statistik|Personalisierung)"
r"[A-Za-zäöüÄÖÜß \-\(\)]*?Cookies?[^A-Z]{0,400}?)"
r"(?:(\d+)\s*(Sekunde|Minute|Stunde|Tag|Woche|Monat|Jahr|day|month|year)|"
r"\b(Session|Permanent)\b)",
re.I | re.S,
)
def parse_flat_cookie_text(text: str) -> list[dict]:
"""Variante fuer Sites wie VW die ihre Cookie-Tabelle als flachen
Text liefern (Cookie-Name + Kategorie + Beschreibung + Dauer in
einem Block hintereinander, ohne klare Trenner).
Regex sucht nach 'NAME [Tracking|Session|Funktional...] Cookies
... [13 Monate|Session|Permanent]' und behandelt jeden Match als
eine Tabellen-Zeile.
"""
if not text or len(text) < 500:
return []
matches = list(_FLAT_ROW_RE.finditer(text))
if len(matches) < 3:
return []
by_vendor: dict[str, dict] = {}
seen_names: set[str] = set()
for m in matches:
name = m.group(1).strip()
nl = name.lower()
if nl in seen_names:
continue
if nl in ("dieser", "diese", "ein", "der", "die", "das",
"session", "permanent", "funktional", "notwendig",
"marketing", "analytics", "werbung", "anbieter",
"tracking", "cookie", "cookies", "und", "von",
"einer", "ist", "alle", "noch", "auch", "name",
"art", "zweck", "dauer"):
continue
if len(name) < 3 or len(name) > 60:
continue
seen_names.add(nl)
category = _normalize_category(m.group(2) or "")
persistence = ""
if m.group(3):
persistence = f"{m.group(3)} {m.group(4)}"
elif m.group(5):
persistence = m.group(5)
purpose = (m.group(2) or "").strip()[:300]
vendor = _guess_vendor(name) or "Unbekannter Anbieter"
entry = by_vendor.setdefault(vendor, {
"name": vendor, "country": "",
"purpose": purpose, "category": category,
"opt_out_url": "", "privacy_policy_url": "",
"persistence": persistence,
"cookies": [],
"source": "flat_pattern",
})
entry["cookies"].append({
"name": name, "purpose": purpose[:200],
"expiry": persistence, "is_third_party": True,
})
out = list(by_vendor.values())
logger.info("parse_flat_cookie_text: %d vendors / %d cookies",
len(out), sum(len(v["cookies"]) for v in out))
return out
_VENDOR_GUESS = ( _VENDOR_GUESS = (
("_ga", "Google"), ("_gid", "Google"), ("_gcl_", "Google"), ("_ga", "Google"), ("_gid", "Google"), ("_gcl_", "Google"),
("ANID", "Google"), ("AID", "Google"), ("FPGCLDC", "Google"), ("ANID", "Google"), ("AID", "Google"), ("FPGCLDC", "Google"),
+29
View File
@@ -182,6 +182,35 @@ class DSIDiscoveryResult:
# not the homepage navigation that DOM extraction returns. # not the homepage navigation that DOM extraction returns.
cmp_cookie_text: str = "" cmp_cookie_text: str = ""
async def _extract_dom_tables(page) -> list[list[str]]:
"""D — extrahiert alle <table>-Elemente aus dem aktuellen DOM als
list[list[str]] (jede Tabelle = Array von Tab-getrennten Zeilen).
Wird VOR der Navigation woandershin von jeder Document-Loading-
Funktion aufgerufen damit jede DiscoveredDSI ihre Tabellen behaelt.
"""
try:
return await page.evaluate("""
() => {
const out = [];
document.querySelectorAll('table').forEach(t => {
const rows = [];
t.querySelectorAll('tr').forEach(tr => {
const cells = [];
tr.querySelectorAll('th, td').forEach(c => {
cells.push((c.innerText || c.textContent || '').trim().replace(/\\s+/g, ' '));
});
if (cells.length >= 2) rows.push(cells.join('\\t'));
});
if (rows.length >= 3) out.push(rows);
});
return out.slice(0, 10);
}
""") or []
except Exception:
return []
def _matches_dsi_keyword(text: str) -> tuple[bool, str]: def _matches_dsi_keyword(text: str) -> tuple[bool, str]:
"""Check if text contains any DSI keyword. Returns (match, language).""" """Check if text contains any DSI keyword. Returns (match, language)."""
text_lower = text.lower().strip() text_lower = text.lower().strip()