"""Match unmatched OWASP ASVS/SAMM/MASVS controls against GitHub Markdown sources.""" import os import re import unicodedata import psycopg2 import urllib.parse from pathlib import Path GITHUB_DIR = Path(os.path.expanduser("~/rag-ingestion/owasp-github")) def normalize(s): s = s.replace('\u00ad', '').replace('\xad', '') s = s.replace('\u200b', '').replace('\u00a0', ' ') s = s.replace('\ufb01', 'fi').replace('\ufb02', 'fl') s = s.replace('\ufb00', 'ff').replace('\ufb03', 'ffi').replace('\ufb04', 'ffl') s = s.replace('\u2019', "'").replace('\u2018', "'") s = s.replace('\u201c', '"').replace('\u201d', '"') s = s.replace('\u2013', '-').replace('\u2014', '-') s = s.replace('\u2022', '-').replace('\u00b7', '-') s = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f]', '', s) s = unicodedata.normalize('NFC', s) s = re.sub(r'\s+', ' ', s) return s.strip() # ── Load Markdown sources ── def load_markdown_dir(path, pattern="*.md"): """Load all markdown files, return combined text and per-file index.""" texts = {} for f in sorted(path.glob(pattern)): try: texts[f.name] = f.read_text(encoding='utf-8', errors='replace') except: pass return texts # ASVS 4.0 — V-files contain requirements asvs_dir = GITHUB_DIR / "ASVS" / "4.0" / "en" asvs_files = load_markdown_dir(asvs_dir) asvs_full = "\n".join(asvs_files.values()) asvs_norm = normalize(asvs_full) print(f"ASVS 4.0 Markdown: {len(asvs_files)} files, {len(asvs_full):,} chars") # SAMM core — YAML + Markdown samm_dir = GITHUB_DIR / "samm-core" samm_texts = {} for f in samm_dir.rglob("*.yml"): try: samm_texts[str(f.relative_to(samm_dir))] = f.read_text(encoding='utf-8', errors='replace') except: pass for f in samm_dir.rglob("*.md"): try: samm_texts[str(f.relative_to(samm_dir))] = f.read_text(encoding='utf-8', errors='replace') except: pass samm_full = "\n".join(samm_texts.values()) samm_norm = normalize(samm_full) print(f"SAMM 2.0 source: {len(samm_texts)} files, {len(samm_full):,} chars") # MASVS — control markdown files masvs_dir = GITHUB_DIR / "masvs" masvs_files = {} for f in masvs_dir.rglob("*.md"): try: masvs_files[str(f.relative_to(masvs_dir))] = f.read_text(encoding='utf-8', errors='replace') except: pass masvs_full = "\n".join(masvs_files.values()) masvs_norm = normalize(masvs_full) print(f"MASVS 2.0 source: {len(masvs_files)} files, {len(masvs_full):,} chars") # API Security api_dir = GITHUB_DIR / "api-security" api_files = {} for f in api_dir.rglob("*.md"): try: api_files[str(f.relative_to(api_dir))] = f.read_text(encoding='utf-8', errors='replace') except: pass api_full = "\n".join(api_files.values()) api_norm = normalize(api_full) print(f"API Security source: {len(api_files)} files, {len(api_full):,} chars") # Source → (normalized_text, index_builder) SOURCE_GITHUB = { "OWASP ASVS 4.0": asvs_norm, "OWASP SAMM 2.0": samm_norm, "OWASP MASVS 2.0": masvs_norm, "OWASP API Security Top 10 (2023)": api_norm, } # Build indexes for each source def build_asvs_index(text): items = [] for m in re.finditer(r'(V\d+\.\d+(?:\.\d+)?)\b', text): items.append((m.start(), m.group(1), "requirement")) items.sort(key=lambda x: x[0]) seen = set() return [(p, l, t) for p, l, t in items if l not in seen and not seen.add(l)] def build_samm_index(text): items = [] # SAMM practices have names like "Strategy & Metrics", sections numbered for m in re.finditer(r'(?:^|\s)(\d+\.\d+(?:\.\d+)?)\s+[A-Z]', text): items.append((m.start(), f"Section {m.group(1)}", "section")) # Also find practice identifiers for m in re.finditer(r'((?:Strategy|Education|Policy|Threat|Security Requirements|Secure Architecture|' r'Secure Build|Secure Deployment|Defect Management|Environment Management|' r'Incident Management|Requirements Testing|Security Testing|' r'Design Review|Implementation Review|Operations Management)' r'[^.\n]{0,30})', text): items.append((m.start(), m.group(1)[:50], "section")) items.sort(key=lambda x: x[0]) seen = set() return [(p, l, t) for p, l, t in items if l not in seen and not seen.add(l)] def build_masvs_index(text): items = [] for m in re.finditer(r'(MASVS-[A-Z]+-\d+)', text): items.append((m.start(), m.group(1), "requirement")) items.sort(key=lambda x: x[0]) seen = set() return [(p, l, t) for p, l, t in items if l not in seen and not seen.add(l)] def build_api_index(text): items = [] for m in re.finditer(r'(API\d+:\d{4})', text): items.append((m.start(), m.group(1), "category")) items.sort(key=lambda x: x[0]) seen = set() return [(p, l, t) for p, l, t in items if l not in seen and not seen.add(l)] SOURCE_INDEX_BUILDERS = { "OWASP ASVS 4.0": build_asvs_index, "OWASP SAMM 2.0": build_samm_index, "OWASP MASVS 2.0": build_masvs_index, "OWASP API Security Top 10 (2023)": build_api_index, } # Build all indexes on normalized text source_indexes = {} for name, norm_text in SOURCE_GITHUB.items(): builder = SOURCE_INDEX_BUILDERS[name] idx = builder(norm_text) source_indexes[name] = idx print(f" {name}: {len(idx)} index entries") def find_text(orig_text, source_name): """Find control text in GitHub source. Returns (label, type) or None.""" norm_text = SOURCE_GITHUB.get(source_name) if not norm_text: return None idx = source_indexes.get(source_name, []) orig_norm = normalize(orig_text) if len(orig_norm) < 20: return None for start_frac in [0.25, 0.1, 0.5, 0.0, 0.75]: for length in [80, 60, 40, 30, 20]: start = max(0, int(len(orig_norm) * start_frac)) snippet = orig_norm[start:start+length] if not snippet or len(snippet) < 15: continue pos = norm_text.find(snippet) if pos >= 0: label = "Unknown" typ = "unknown" for h_pos, h_label, h_type in reversed(idx): if h_pos <= pos: label = h_label typ = h_type break return (label, typ) return None def find_in_any_github(orig_text, exclude_source=None): """Try all GitHub sources.""" for name in SOURCE_GITHUB: if name == exclude_source: continue result = find_text(orig_text, name) if result: return (name, result[0], result[1]) return None # ── DB ── db_url = os.environ['DATABASE_URL'] parsed = urllib.parse.urlparse(db_url) conn = psycopg2.connect( host=parsed.hostname, port=parsed.port or 5432, user=parsed.username, password=parsed.password, dbname=parsed.path.lstrip('/'), options="-c search_path=compliance,public" ) cur = conn.cursor() # ── Process each OWASP source ── total_matched = 0 total_cross = 0 total_not_found = 0 all_updates = [] for source in ['OWASP ASVS 4.0', 'OWASP SAMM 2.0', 'OWASP MASVS 2.0', 'OWASP API Security Top 10 (2023)']: cur.execute(""" SELECT id, control_id, title, source_original_text, release_state FROM compliance.canonical_controls WHERE source_citation->>'source' = %s AND source_citation->>'article_type' IS NULL AND source_original_text IS NOT NULL AND release_state NOT IN ('duplicate', 'too_close') ORDER BY control_id """, (source,)) controls = cur.fetchall() if not controls: continue print(f"\n{'='*60}") print(f"{source} — {len(controls)} unmatched active") print(f"{'='*60}") matched = 0 cross_matched = 0 not_found = 0 for ctrl in controls: uid, cid, title, text, state = ctrl # Try own GitHub source result = find_text(text, source) if result: matched += 1 total_matched += 1 all_updates.append((uid, cid, source, result[0], result[1])) print(f" {cid:10s} → {result[0]:30s} [{result[1]}]") continue # Try other GitHub sources cross = find_in_any_github(text, exclude_source=source) if cross: cross_matched += 1 total_cross += 1 all_updates.append((uid, cid, cross[0], cross[1], cross[2])) print(f" {cid:10s} → [{cross[0]}] {cross[1]:20s} [{cross[2]}] (CROSS)") continue not_found += 1 total_not_found += 1 print(f"\n Own source matched: {matched}") print(f" Cross-source: {cross_matched}") print(f" Not found: {not_found}") # ── Also try OWASP Top 10 remaining unmatched (34 active left after dup marking) ── cur.execute(""" SELECT id, control_id, title, source_original_text, release_state FROM compliance.canonical_controls WHERE source_citation->>'source' = 'OWASP Top 10 (2021)' AND source_citation->>'article_type' IS NULL AND source_original_text IS NOT NULL AND release_state NOT IN ('duplicate', 'too_close') ORDER BY control_id """) top10_remaining = cur.fetchall() if top10_remaining: print(f"\n{'='*60}") print(f"OWASP Top 10 (2021) — {len(top10_remaining)} remaining unmatched active") print(f"{'='*60}") for ctrl in top10_remaining: uid, cid, title, text, state = ctrl cross = find_in_any_github(text) if cross: total_cross += 1 all_updates.append((uid, cid, cross[0], cross[1], cross[2])) print(f" {cid:10s} → [{cross[0]}] {cross[1]:20s} [{cross[2]}]") else: total_not_found += 1 # ── Summary ── print(f"\n{'='*60}") print(f"ZUSAMMENFASSUNG") print(f"{'='*60}") print(f" Matched in eigener GitHub-Quelle: {total_matched}") print(f" Cross-source matched: {total_cross}") print(f" Nicht gefunden: {total_not_found}") print(f" Total Updates: {len(all_updates)}") # ── Apply updates ── if all_updates: print(f"\nApplying {len(all_updates)} updates to DB...") applied = 0 for uid, cid, correct_source, label, typ in all_updates: # Update article + article_type, and fix source if cross-matched cur.execute(""" UPDATE compliance.canonical_controls SET source_citation = source_citation || jsonb_build_object('article', %s, 'article_type', %s) WHERE id = %s AND (source_citation->>'article' IS DISTINCT FROM %s OR source_citation->>'article_type' IS DISTINCT FROM %s) """, (label, typ, uid, label, typ)) if cur.rowcount > 0: applied += 1 conn.commit() print(f" Applied: {applied} controls updated") # Type distribution type_counts = {} for _, _, _, _, typ in all_updates: type_counts[typ] = type_counts.get(typ, 0) + 1 print(f"\n Article type distribution:") for t, c in sorted(type_counts.items(), key=lambda x: -x[1]): print(f" {t:12s}: {c:5d}") conn.close()