Some checks failed
CI/CD / go-lint (push) Has been skipped
CI/CD / python-lint (push) Has been skipped
CI/CD / nodejs-lint (push) Has been skipped
CI/CD / test-go-ai-compliance (push) Failing after 43s
CI/CD / test-python-backend-compliance (push) Successful in 33s
CI/CD / test-python-document-crawler (push) Successful in 21s
CI/CD / test-python-dsms-gateway (push) Successful in 22s
CI/CD / validate-canonical-controls (push) Successful in 11s
CI/CD / Deploy (push) Has been skipped
- Added NIST 800-53, OWASP Top 10/ASVS/SAMM/API/MASVS, ENISA ICS PDFs - Improved normalize() for ligatures, smart quotes, dashes - Added OWASP-specific index builder (A01:2021, V1.1, MASVS-*) - 6,259 article assignments in DB (1,817 article, 1,355 preamble, 1,173 control, 790 annex, 666 section) - Remaining 1,651 unmatched: Blue Guide (EN text vs DE PDF), OWASP multilingual translations (PT/AR/ID/ES) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
527 lines
20 KiB
Python
527 lines
20 KiB
Python
"""
|
|
PDF-based QA: Match ALL controls' source_original_text against original PDFs.
|
|
Determine exact article/section/paragraph for each control.
|
|
Handle: EU regulations (Artikel), German laws (§), NIST sections, OWASP categories,
|
|
Erwägungsgründe (preamble), Anhänge (annexes).
|
|
"""
|
|
import os
|
|
import re
|
|
import json
|
|
import unicodedata
|
|
import psycopg2
|
|
import urllib.parse
|
|
from pathlib import Path
|
|
|
|
try:
|
|
import fitz # PyMuPDF
|
|
HAS_FITZ = True
|
|
except ImportError:
|
|
HAS_FITZ = False
|
|
|
|
PDF_DIR = Path(os.path.expanduser("~/rag-ingestion/pdfs"))
|
|
TEXT_DIR = Path(os.path.expanduser("~/rag-ingestion/texts"))
|
|
|
|
# ── Source name → file path mapping ──────────────────────────────────
|
|
SOURCE_FILE_MAP = {
|
|
# EU Regulations (PDFs)
|
|
"KI-Verordnung (EU) 2024/1689": "ai_act_2024_1689.pdf",
|
|
"Maschinenverordnung (EU) 2023/1230": "machinery_regulation_2023_1230.pdf",
|
|
"Cyber Resilience Act (CRA)": "cra_2024_2847.pdf",
|
|
"EU Blue Guide 2022": "blue_guide_2022.pdf",
|
|
"Markets in Crypto-Assets (MiCA)": "mica_2023_1114.pdf",
|
|
"DSGVO (EU) 2016/679": "dsgvo_2016_679.pdf",
|
|
"Batterieverordnung (EU) 2023/1542": "battery_2023_1542.pdf",
|
|
"NIS2-Richtlinie (EU) 2022/2555": "nis2_2022_2555.pdf",
|
|
"AML-Verordnung": "amlr_2024_1624.pdf",
|
|
"Data Governance Act (DGA)": "dga_2022_868.pdf",
|
|
"Data Act": "dataact_2023_2854.pdf",
|
|
"GPSR (EU) 2023/988": "gpsr_2023_988.pdf",
|
|
"IFRS-Übernahmeverordnung": "ifrs_regulation_2023_1803_de.pdf",
|
|
|
|
# NIST (PDFs)
|
|
"NIST SP 800-53 Rev. 5": "nist_sp_800_53_r5.pdf",
|
|
"NIST SP 800-207 (Zero Trust)": "nist_sp_800_207.pdf",
|
|
"NIST SP 800-63-3": "nist_sp_800_63_3.pdf",
|
|
"NIST AI Risk Management Framework": "nist_ai_rmf.pdf",
|
|
"NIST SP 800-218 (SSDF)": "nist_sp_800_218_ssdf.pdf",
|
|
"NIST Cybersecurity Framework 2.0": "nist_csf_2_0.pdf",
|
|
|
|
# OWASP (PDFs)
|
|
"OWASP Top 10 (2021)": "owasp_top10_2021.pdf",
|
|
"OWASP ASVS 4.0": "owasp_asvs_4_0.pdf",
|
|
"OWASP SAMM 2.0": "owasp_samm_2_0.pdf",
|
|
"OWASP API Security Top 10 (2023)": "owasp_api_top10_2023.pdf",
|
|
"OWASP MASVS 2.0": "owasp_masvs_2_0.pdf",
|
|
|
|
# ENISA (PDFs)
|
|
"ENISA ICS/SCADA Dependencies": "enisa_ics_scada.pdf",
|
|
"ENISA Supply Chain Good Practices": "enisa_supply_chain_security.pdf",
|
|
"ENISA Threat Landscape Supply Chain": "enisa_supply_chain_security.pdf",
|
|
"ENISA Cybersecurity State 2024": None,
|
|
"CISA Secure by Design": "enisa_secure_by_design.pdf",
|
|
|
|
# German laws (PDFs or TXT)
|
|
"Bundesdatenschutzgesetz (BDSG)": "bdsg.pdf",
|
|
"Gewerbeordnung (GewO)": "gewo.pdf",
|
|
"Handelsgesetzbuch (HGB)": "hgb.pdf",
|
|
"Abgabenordnung (AO)": "ao.pdf",
|
|
|
|
# Austrian DSG
|
|
"Österreichisches Datenschutzgesetz (DSG)": None, # ris HTML
|
|
|
|
# EDPB Guidelines (PDFs)
|
|
"EDPB Leitlinien 01/2022 (BCR)": "edpb_bcr_01_2022.pdf",
|
|
"EDPB Leitlinien 05/2020 - Einwilligung": "edpb_consent_05_2020.pdf",
|
|
"EDPB Leitlinien 08/2020 (Social Media)": "edpb_social_media_08_2020.pdf",
|
|
"EDPB Leitlinien 01/2019 (Zertifizierung)": "edpb_certification_01_2019.pdf",
|
|
"EDPB Leitlinien 07/2020 (Datentransfers)": "edpb_transfers_07_2020.pdf",
|
|
"EDPB Leitlinien 09/2022 (Data Breach)": "edpb_breach_09_2022.pdf",
|
|
"EDPB Leitlinien - Berechtigtes Interesse (Art. 6(1)(f))": "edpb_legitimate_interest.pdf",
|
|
"EDPB Leitlinien 01/2024 (Berechtigtes Interesse)": "edpb_legitimate_interest.pdf",
|
|
"EDPB Leitlinien 04/2019 (Data Protection by Design)": "edpb_dpbd_04_2019.pdf",
|
|
"EDPB Leitlinien 01/2020 (Vernetzte Fahrzeuge)": "edpb_connected_vehicles_01_2020.pdf",
|
|
"EDPB Leitlinien 01/2020 (Datentransfers)": "edpb_transfers_07_2020.pdf",
|
|
|
|
# WP (Working Party) Guidelines
|
|
"WP244 Leitlinien (Profiling)": "edpb_wp251_profiling.pdf",
|
|
"WP251 Leitlinien (Profiling)": "edpb_wp251_profiling.pdf",
|
|
"WP260 Leitlinien (Transparenz)": "edpb_wp260_transparency.pdf",
|
|
|
|
# OECD
|
|
"OECD KI-Empfehlung": "oecd_ai_principles.pdf",
|
|
}
|
|
|
|
# ── Document type classification ─────────────────────────────────────
|
|
DOC_TYPE_MAP = {
|
|
# EU regulations: "Artikel N"
|
|
"eu_regulation": [
|
|
"KI-Verordnung", "Maschinenverordnung", "Cyber Resilience",
|
|
"Blue Guide", "MiCA", "DSGVO", "Batterieverordnung", "NIS2",
|
|
"AML-Verordnung", "Data Governance", "Data Act", "GPSR",
|
|
"IFRS", "Markets in Crypto",
|
|
],
|
|
# German laws: "§ N"
|
|
"de_law": [
|
|
"BDSG", "GewO", "HGB", "Abgabenordnung",
|
|
],
|
|
# NIST: "Section X.Y" or control families "AC-1"
|
|
"nist": [
|
|
"NIST SP", "NIST Cybersecurity", "NIST AI",
|
|
],
|
|
# OWASP: "A01:2021" or "V1.1"
|
|
"owasp": [
|
|
"OWASP",
|
|
],
|
|
# EDPB: numbered paragraphs or sections
|
|
"edpb": [
|
|
"EDPB", "WP244", "WP251", "WP260",
|
|
],
|
|
# ENISA: sections
|
|
"enisa": [
|
|
"ENISA", "CISA",
|
|
],
|
|
}
|
|
|
|
|
|
def classify_doc(source_name):
|
|
"""Classify document type based on source name."""
|
|
if not source_name:
|
|
return "unknown"
|
|
for doc_type, keywords in DOC_TYPE_MAP.items():
|
|
for kw in keywords:
|
|
if kw.lower() in source_name.lower():
|
|
return doc_type
|
|
return "unknown"
|
|
|
|
|
|
def normalize(s):
|
|
"""Remove soft hyphens, normalize whitespace, handle PDF encoding issues."""
|
|
s = s.replace('\u00ad', '').replace('\xad', '') # soft hyphen
|
|
s = s.replace('\u200b', '').replace('\u00a0', ' ') # zero-width, nbsp
|
|
s = s.replace('\ufb01', 'fi').replace('\ufb02', 'fl') # ligatures
|
|
s = s.replace('\ufb00', 'ff').replace('\ufb03', 'ffi').replace('\ufb04', 'ffl')
|
|
s = s.replace('\u2019', "'").replace('\u2018', "'") # smart quotes
|
|
s = s.replace('\u201c', '"').replace('\u201d', '"')
|
|
s = s.replace('\u2013', '-').replace('\u2014', '-') # en/em dash
|
|
s = s.replace('\u2022', '-') # bullet
|
|
s = s.replace('\u00b7', '-') # middle dot
|
|
# Remove common PDF artifacts
|
|
s = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f]', '', s) # control chars
|
|
s = unicodedata.normalize('NFC', s)
|
|
s = re.sub(r'\s+', ' ', s)
|
|
return s.strip()
|
|
|
|
|
|
def read_file(filename):
|
|
"""Read PDF or text file, return full text."""
|
|
path = PDF_DIR / filename
|
|
if not path.exists():
|
|
# Try text dir
|
|
txt_name = path.stem + ".txt"
|
|
txt_path = TEXT_DIR / txt_name
|
|
if txt_path.exists():
|
|
return txt_path.read_text(encoding='utf-8', errors='replace')
|
|
return None
|
|
|
|
if path.suffix == '.pdf':
|
|
if not HAS_FITZ:
|
|
return None
|
|
doc = fitz.open(str(path))
|
|
text = ""
|
|
for page in doc:
|
|
text += page.get_text() + "\n"
|
|
doc.close()
|
|
return text
|
|
elif path.suffix in ('.txt', '.html'):
|
|
return path.read_text(encoding='utf-8', errors='replace')
|
|
return None
|
|
|
|
|
|
def build_eu_article_index(text, max_article=None):
|
|
"""Build article heading index for EU regulations.
|
|
Returns list of (position, label, type) where type is 'article', 'preamble', 'annex'."""
|
|
items = []
|
|
|
|
# Find Erwägungsgründe (recitals) — numbered (1), (2), etc. before Artikel 1
|
|
# Find where Artikel 1 starts
|
|
art1_match = re.search(r'\nArtikel\s+1\s*\n', text)
|
|
art1_pos = art1_match.start() if art1_match else len(text)
|
|
|
|
# Recital markers before Artikel 1
|
|
for m in re.finditer(r'(?:^|\n)\s*\((\d+)\)', text[:art1_pos]):
|
|
items.append((m.start(), f"Erwägungsgrund ({m.group(1)})", "preamble"))
|
|
|
|
# Article headings: "Artikel N" on its own line
|
|
for m in re.finditer(r'(?:^|\n)\s*Artikel\s+(\d+[a-z]?)\s*\n', text, re.MULTILINE):
|
|
art_num_str = m.group(1)
|
|
art_num = int(re.match(r'(\d+)', art_num_str).group(1))
|
|
# Filter by max article number if known
|
|
if max_article and art_num > max_article:
|
|
continue
|
|
items.append((m.start(), f"Artikel {art_num_str}", "article"))
|
|
|
|
# Anhang/Annex markers
|
|
for m in re.finditer(r'(?:^|\n)\s*ANHANG\s+([IVXLC]+[a-z]?)\b', text, re.MULTILINE):
|
|
items.append((m.start(), f"Anhang {m.group(1)}", "annex"))
|
|
# Also try "Anhang" without Roman numeral (single annex)
|
|
for m in re.finditer(r'(?:^|\n)\s*ANHANG\s*\n', text, re.MULTILINE):
|
|
items.append((m.start(), f"Anhang", "annex"))
|
|
|
|
items.sort(key=lambda x: x[0])
|
|
|
|
# Deduplicate: keep first occurrence of each label
|
|
seen = set()
|
|
unique = []
|
|
for pos, label, typ in items:
|
|
if label not in seen:
|
|
seen.add(label)
|
|
unique.append((pos, label, typ))
|
|
|
|
return unique
|
|
|
|
|
|
def build_de_law_index(text):
|
|
"""Build section index for German laws (§ N)."""
|
|
items = []
|
|
for m in re.finditer(r'(?:^|\n)\s*§\s+(\d+[a-z]?)\b', text, re.MULTILINE):
|
|
items.append((m.start(), f"§ {m.group(1)}", "section"))
|
|
|
|
items.sort(key=lambda x: x[0])
|
|
seen = set()
|
|
unique = []
|
|
for pos, label, typ in items:
|
|
if label not in seen:
|
|
seen.add(label)
|
|
unique.append((pos, label, typ))
|
|
return unique
|
|
|
|
|
|
def build_nist_index(text):
|
|
"""Build section index for NIST documents."""
|
|
items = []
|
|
# NIST sections: "2.1 Section Name" or control families "AC-1"
|
|
for m in re.finditer(r'(?:^|\n)\s*(\d+\.\d+(?:\.\d+)?)\s+[A-Z]', text, re.MULTILINE):
|
|
items.append((m.start(), f"Section {m.group(1)}", "section"))
|
|
# Control families
|
|
for m in re.finditer(r'(?:^|\n)\s*([A-Z]{2}-\d+)\b', text, re.MULTILINE):
|
|
items.append((m.start(), f"{m.group(1)}", "control"))
|
|
|
|
items.sort(key=lambda x: x[0])
|
|
seen = set()
|
|
unique = []
|
|
for pos, label, typ in items:
|
|
if label not in seen:
|
|
seen.add(label)
|
|
unique.append((pos, label, typ))
|
|
return unique
|
|
|
|
|
|
def build_owasp_index(text, source_name):
|
|
"""Build index for OWASP documents."""
|
|
items = []
|
|
|
|
if "Top 10" in source_name and "API" not in source_name:
|
|
# OWASP Top 10: A01:2021, A02:2021, etc.
|
|
for m in re.finditer(r'(A\d{2}:\d{4})', text):
|
|
items.append((m.start(), m.group(1), "category"))
|
|
elif "API" in source_name:
|
|
# OWASP API Top 10: API1:2023, API2:2023, etc.
|
|
for m in re.finditer(r'(API\d+:\d{4})', text):
|
|
items.append((m.start(), m.group(1), "category"))
|
|
elif "ASVS" in source_name:
|
|
# OWASP ASVS: V1.1, V2.1.1, etc.
|
|
for m in re.finditer(r'(?:^|\n)\s*(V\d+\.\d+(?:\.\d+)?)\b', text, re.MULTILINE):
|
|
items.append((m.start(), m.group(1), "requirement"))
|
|
elif "SAMM" in source_name:
|
|
# OWASP SAMM: practice names like "Strategy & Metrics", "Education & Guidance"
|
|
# Use section numbers
|
|
for m in re.finditer(r'(?:^|\n)\s*(\d+\.\d+(?:\.\d+)?)\s+[A-Z]', text, re.MULTILINE):
|
|
items.append((m.start(), f"Section {m.group(1)}", "section"))
|
|
elif "MASVS" in source_name:
|
|
# OWASP MASVS: MASVS-STORAGE-1, MASVS-CRYPTO-1, etc.
|
|
for m in re.finditer(r'(MASVS-[A-Z]+-\d+)', text):
|
|
items.append((m.start(), m.group(1), "requirement"))
|
|
|
|
# Fallback: also find generic section numbers
|
|
if not items:
|
|
for m in re.finditer(r'(?:^|\n)\s*(\d+\.\d+(?:\.\d+)?)\s+[A-Z]', text, re.MULTILINE):
|
|
items.append((m.start(), f"Section {m.group(1)}", "section"))
|
|
|
|
items.sort(key=lambda x: x[0])
|
|
seen = set()
|
|
unique = []
|
|
for pos, label, typ in items:
|
|
if label not in seen:
|
|
seen.add(label)
|
|
unique.append((pos, label, typ))
|
|
return unique
|
|
|
|
|
|
def build_generic_index(text):
|
|
"""Build a generic section index using numbered headings."""
|
|
items = []
|
|
# Try section numbers: "1.", "1.1", "1.1.1"
|
|
for m in re.finditer(r'(?:^|\n)\s*(\d+(?:\.\d+)*)\.\s+[A-Z]', text, re.MULTILINE):
|
|
items.append((m.start(), f"Section {m.group(1)}", "section"))
|
|
|
|
items.sort(key=lambda x: x[0])
|
|
seen = set()
|
|
unique = []
|
|
for pos, label, typ in items:
|
|
if label not in seen:
|
|
seen.add(label)
|
|
unique.append((pos, label, typ))
|
|
return unique
|
|
|
|
|
|
# Known max article numbers for EU regulations
|
|
MAX_ARTICLES = {
|
|
"Batterieverordnung (EU) 2023/1542": 96,
|
|
"KI-Verordnung (EU) 2024/1689": 113,
|
|
"Maschinenverordnung (EU) 2023/1230": 54,
|
|
"Cyber Resilience Act (CRA)": 71,
|
|
"NIS2-Richtlinie (EU) 2022/2555": 46,
|
|
"DSGVO (EU) 2016/679": 99,
|
|
"Markets in Crypto-Assets (MiCA)": 149,
|
|
"AML-Verordnung": 95,
|
|
"Data Governance Act (DGA)": 38,
|
|
"Data Act": 50,
|
|
"GPSR (EU) 2023/988": 52,
|
|
}
|
|
|
|
|
|
def find_text_in_doc(orig_text, full_norm, index, index_norm_positions):
|
|
"""Find control text in document and return (article_label, article_type) or None."""
|
|
orig_norm = normalize(orig_text)
|
|
if len(orig_norm) < 30:
|
|
return None
|
|
|
|
# Try progressively shorter substrings from different positions
|
|
for start_frac in [0.25, 0.1, 0.5, 0.0, 0.75]:
|
|
for length in [80, 60, 40, 30, 20]:
|
|
start = max(0, int(len(orig_norm) * start_frac))
|
|
snippet = orig_norm[start:start+length]
|
|
if not snippet or len(snippet) < 15:
|
|
continue
|
|
pos = full_norm.find(snippet)
|
|
if pos >= 0:
|
|
# Find which section precedes this position
|
|
label = "Unknown"
|
|
typ = "unknown"
|
|
for h_pos, h_label, h_type in reversed(index_norm_positions):
|
|
if h_pos <= pos:
|
|
label = h_label
|
|
typ = h_type
|
|
break
|
|
return (label, typ)
|
|
return None
|
|
|
|
|
|
# ── Main ─────────────────────────────────────────────────────────────
|
|
def main():
|
|
db_url = os.environ['DATABASE_URL']
|
|
parsed = urllib.parse.urlparse(db_url)
|
|
conn = psycopg2.connect(
|
|
host=parsed.hostname, port=parsed.port or 5432,
|
|
user=parsed.username, password=parsed.password,
|
|
dbname=parsed.path.lstrip('/'),
|
|
options="-c search_path=compliance,public"
|
|
)
|
|
cur = conn.cursor()
|
|
|
|
# Get all controls with source_original_text
|
|
cur.execute("""
|
|
SELECT id, control_id, title, source_original_text,
|
|
source_citation->>'source' as source_name,
|
|
source_citation->>'article' as existing_article,
|
|
source_citation as citation_json,
|
|
release_state
|
|
FROM compliance.canonical_controls
|
|
WHERE source_original_text IS NOT NULL
|
|
AND length(source_original_text) > 50
|
|
ORDER BY source_citation->>'source', control_id
|
|
""")
|
|
controls = cur.fetchall()
|
|
print(f"Total controls with source text: {len(controls)}")
|
|
|
|
# Group by source
|
|
by_source = {}
|
|
for ctrl in controls:
|
|
src = ctrl[4] or "(null)"
|
|
by_source.setdefault(src, []).append(ctrl)
|
|
|
|
# Process each source
|
|
total_found = 0
|
|
total_not_found = 0
|
|
total_updated = 0
|
|
total_new_article = 0
|
|
total_changed = 0
|
|
total_skipped_no_file = 0
|
|
updates = [] # (ctrl_id, new_article_label, article_type)
|
|
|
|
for source_name in sorted(by_source.keys(), key=lambda s: -len(by_source[s])):
|
|
ctrls = by_source[source_name]
|
|
filename = SOURCE_FILE_MAP.get(source_name)
|
|
doc_type = classify_doc(source_name)
|
|
|
|
if filename is None:
|
|
total_skipped_no_file += len(ctrls)
|
|
active = sum(1 for c in ctrls if c[7] not in ('duplicate', 'too_close'))
|
|
print(f"\n{'='*60}")
|
|
print(f"SKIP: {source_name} ({len(ctrls)} controls, {active} active) — no PDF")
|
|
continue
|
|
|
|
# Read file
|
|
text = read_file(filename)
|
|
if text is None:
|
|
total_skipped_no_file += len(ctrls)
|
|
print(f"\n{'='*60}")
|
|
print(f"SKIP: {source_name} — file not readable: {filename}")
|
|
continue
|
|
|
|
text_norm = normalize(text)
|
|
|
|
# Build index based on doc type
|
|
max_art = MAX_ARTICLES.get(source_name)
|
|
if doc_type == "eu_regulation":
|
|
index = build_eu_article_index(text, max_article=max_art)
|
|
elif doc_type == "de_law":
|
|
index = build_de_law_index(text)
|
|
elif doc_type == "nist":
|
|
index = build_nist_index(text)
|
|
elif doc_type == "owasp":
|
|
index = build_owasp_index(text, source_name)
|
|
else:
|
|
index = build_generic_index(text)
|
|
|
|
# Precompute normalized positions
|
|
index_norm = []
|
|
for pos, label, typ in index:
|
|
norm_pos = len(normalize(text[:pos]))
|
|
index_norm.append((norm_pos, label, typ))
|
|
|
|
active = sum(1 for c in ctrls if c[7] not in ('duplicate', 'too_close'))
|
|
print(f"\n{'='*60}")
|
|
print(f"{source_name} ({len(ctrls)} controls, {active} active)")
|
|
print(f" File: {filename} ({len(text):,} chars)")
|
|
print(f" Index: {len(index)} sections ({doc_type})")
|
|
|
|
src_found = 0
|
|
src_not_found = 0
|
|
|
|
for ctrl in ctrls:
|
|
ctrl_id, control_id, title, orig_text, _, existing_art, citation_json, state = ctrl
|
|
|
|
result = find_text_in_doc(orig_text, text_norm, index, index_norm)
|
|
|
|
if result:
|
|
new_label, art_type = result
|
|
src_found += 1
|
|
total_found += 1
|
|
|
|
# Compare with existing
|
|
existing_clean = (existing_art or "").strip()
|
|
if not existing_clean:
|
|
status = "NEW"
|
|
total_new_article += 1
|
|
elif existing_clean == new_label:
|
|
status = "OK"
|
|
else:
|
|
status = f"CHANGED({existing_clean}→{new_label})"
|
|
total_changed += 1
|
|
|
|
updates.append((ctrl_id, new_label, art_type, control_id, source_name))
|
|
|
|
if status != "OK":
|
|
is_active = "" if state not in ('duplicate', 'too_close') else " [DUP]"
|
|
print(f" {control_id:10s}: {new_label:25s} [{art_type:8s}] {status}{is_active}")
|
|
else:
|
|
src_not_found += 1
|
|
total_not_found += 1
|
|
print(f" {control_id:10s}: NOT FOUND {title[:50]}")
|
|
|
|
pct = src_found / len(ctrls) * 100 if ctrls else 0
|
|
print(f" → {src_found}/{len(ctrls)} matched ({pct:.0f}%)")
|
|
|
|
# ── Summary ──────────────────────────────────────────────────────
|
|
print(f"\n{'='*60}")
|
|
print("SUMMARY")
|
|
print(f"{'='*60}")
|
|
print(f" Total controls with text: {len(controls)}")
|
|
print(f" Matched to PDF: {total_found}")
|
|
print(f" Not found in PDF: {total_not_found}")
|
|
print(f" Skipped (no PDF file): {total_skipped_no_file}")
|
|
print(f" New articles assigned: {total_new_article}")
|
|
print(f" Articles changed: {total_changed}")
|
|
|
|
# Save results for later application
|
|
results = []
|
|
for ctrl_id, label, art_type, control_id, source in updates:
|
|
results.append({
|
|
"ctrl_id": str(ctrl_id),
|
|
"control_id": control_id,
|
|
"source": source,
|
|
"article_label": label,
|
|
"article_type": art_type,
|
|
})
|
|
|
|
out_path = "/tmp/pdf_qa_results.json"
|
|
with open(out_path, 'w') as f:
|
|
json.dump(results, f, indent=2, ensure_ascii=False)
|
|
print(f"\n Results saved to {out_path} ({len(results)} entries)")
|
|
|
|
# Type distribution
|
|
type_counts = {}
|
|
for r in results:
|
|
t = r["article_type"]
|
|
type_counts[t] = type_counts.get(t, 0) + 1
|
|
print(f"\n Article type distribution:")
|
|
for t, c in sorted(type_counts.items(), key=lambda x: -x[1]):
|
|
print(f" {t:12s}: {c:5d}")
|
|
|
|
conn.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|