This repository has been archived on 2026-02-15. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
breakpilot-pwa/klausur-service/backend/legal_corpus_ingestion.py
BreakPilot Dev f927c0c205 feat(rag): Add DACH legal corpus ingestion (DE/AT/CH laws)
Add 29 new regulations (7 DE + 7 AT + 4 CH + 11 P2/P3) with country
metadata, legal corpus text excerpts, and updated RAG admin UI with
AT/CH type colors and labels. Fix module path in deploy script.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-11 09:24:33 +01:00

1281 lines
53 KiB
Python

"""
Legal Corpus Ingestion for UCCA RAG Integration.
Indexes all regulations from the Compliance Hub into Qdrant for
semantic search during UCCA assessments and explanations.
Includes EU regulations, DACH national laws, and EDPB guidelines.
Collections:
- bp_legal_corpus: All regulation texts (GDPR, AI Act, CRA, BSI, etc.)
Usage:
python legal_corpus_ingestion.py --ingest-all
python legal_corpus_ingestion.py --ingest GDPR AIACT
python legal_corpus_ingestion.py --status
"""
import asyncio
import hashlib
import json
import logging
import os
import re
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from urllib.parse import urlparse
import httpx
from qdrant_client import QdrantClient
from qdrant_client.models import (
Distance,
FieldCondition,
Filter,
MatchValue,
PointStruct,
VectorParams,
)
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Configuration - Support both QDRANT_URL and QDRANT_HOST/PORT
_qdrant_url = os.getenv("QDRANT_URL", "")
if _qdrant_url:
# Parse URL: http://qdrant:6333 -> host=qdrant, port=6333
from urllib.parse import urlparse
_parsed = urlparse(_qdrant_url)
QDRANT_HOST = _parsed.hostname or "localhost"
QDRANT_PORT = _parsed.port or 6333
else:
QDRANT_HOST = os.getenv("QDRANT_HOST", "localhost")
QDRANT_PORT = int(os.getenv("QDRANT_PORT", "6333"))
EMBEDDING_SERVICE_URL = os.getenv("EMBEDDING_SERVICE_URL", "http://localhost:8087")
LEGAL_CORPUS_COLLECTION = "bp_legal_corpus"
VECTOR_SIZE = 1024 # BGE-M3 dimension
# Chunking configuration - matched to NIBIS settings for semantic chunking
CHUNK_SIZE = int(os.getenv("LEGAL_CHUNK_SIZE", "1000"))
CHUNK_OVERLAP = int(os.getenv("LEGAL_CHUNK_OVERLAP", "200"))
# Base path for local PDF/HTML files
# In Docker: /app/docs/legal_corpus (mounted volume)
# Local dev: relative to script location
_default_docs_path = Path(__file__).parent.parent / "docs" / "legal_corpus"
LEGAL_DOCS_PATH = Path(os.getenv("LEGAL_DOCS_PATH", str(_default_docs_path)))
# Docker-specific override: if /app/docs exists, use it
if Path("/app/docs/legal_corpus").exists():
LEGAL_DOCS_PATH = Path("/app/docs/legal_corpus")
@dataclass
class Regulation:
"""Regulation metadata."""
code: str
name: str
full_name: str
regulation_type: str
source_url: str
description: str
celex: Optional[str] = None # CELEX number for EUR-Lex direct access
local_path: Optional[str] = None
language: str = "de"
requirement_count: int = 0
# All regulations from Compliance Hub (EU + DACH national laws + guidelines)
REGULATIONS: List[Regulation] = [
Regulation(
code="GDPR",
name="DSGVO",
full_name="Verordnung (EU) 2016/679 - Datenschutz-Grundverordnung",
regulation_type="eu_regulation",
source_url="https://eur-lex.europa.eu/eli/reg/2016/679/oj/deu",
description="Grundverordnung zum Schutz natuerlicher Personen bei der Verarbeitung personenbezogener Daten.",
celex="32016R0679",
requirement_count=99,
),
Regulation(
code="EPRIVACY",
name="ePrivacy-Richtlinie",
full_name="Richtlinie 2002/58/EG",
regulation_type="eu_directive",
source_url="https://eur-lex.europa.eu/eli/dir/2002/58/oj/deu",
description="Datenschutz in der elektronischen Kommunikation, Cookies und Tracking.",
celex="32002L0058",
requirement_count=25,
),
Regulation(
code="TDDDG",
name="TDDDG",
full_name="Telekommunikation-Digitale-Dienste-Datenschutz-Gesetz",
regulation_type="de_law",
source_url="https://www.gesetze-im-internet.de/ttdsg/TDDDG.pdf",
description="Deutsche Umsetzung der ePrivacy-Richtlinie (30 Paragraphen).",
requirement_count=30,
),
Regulation(
code="SCC",
name="Standardvertragsklauseln",
full_name="Durchfuehrungsbeschluss (EU) 2021/914",
regulation_type="eu_regulation",
source_url="https://eur-lex.europa.eu/eli/dec_impl/2021/914/oj/deu",
description="Standardvertragsklauseln fuer Drittlandtransfers.",
celex="32021D0914",
requirement_count=18,
),
Regulation(
code="DPF",
name="EU-US Data Privacy Framework",
full_name="Durchfuehrungsbeschluss (EU) 2023/1795",
regulation_type="eu_regulation",
source_url="https://eur-lex.europa.eu/eli/dec_impl/2023/1795/oj",
description="Angemessenheitsbeschluss fuer USA-Transfers.",
celex="32023D1795",
requirement_count=12,
),
Regulation(
code="AIACT",
name="EU AI Act",
full_name="Verordnung (EU) 2024/1689 - KI-Verordnung",
regulation_type="eu_regulation",
source_url="https://eur-lex.europa.eu/eli/reg/2024/1689/oj/deu",
description="EU-Verordnung zur Regulierung von KI-Systemen nach Risikostufen.",
celex="32024R1689",
requirement_count=85,
),
Regulation(
code="CRA",
name="Cyber Resilience Act",
full_name="Verordnung (EU) 2024/2847",
regulation_type="eu_regulation",
source_url="https://eur-lex.europa.eu/eli/reg/2024/2847/oj/deu",
description="Cybersicherheitsanforderungen, SBOM-Pflicht.",
celex="32024R2847",
requirement_count=45,
),
Regulation(
code="NIS2",
name="NIS2-Richtlinie",
full_name="Richtlinie (EU) 2022/2555",
regulation_type="eu_directive",
source_url="https://eur-lex.europa.eu/eli/dir/2022/2555/oj/deu",
description="Cybersicherheit fuer wesentliche Einrichtungen.",
celex="32022L2555",
requirement_count=46,
),
Regulation(
code="EUCSA",
name="EU Cybersecurity Act",
full_name="Verordnung (EU) 2019/881",
regulation_type="eu_regulation",
source_url="https://eur-lex.europa.eu/eli/reg/2019/881/oj/deu",
description="ENISA und Cybersicherheitszertifizierung.",
celex="32019R0881",
requirement_count=35,
),
Regulation(
code="DATAACT",
name="Data Act",
full_name="Verordnung (EU) 2023/2854",
regulation_type="eu_regulation",
source_url="https://eur-lex.europa.eu/eli/reg/2023/2854/oj/deu",
description="Fairer Datenzugang, IoT-Daten, Cloud-Wechsel.",
celex="32023R2854",
requirement_count=42,
),
Regulation(
code="DGA",
name="Data Governance Act",
full_name="Verordnung (EU) 2022/868",
regulation_type="eu_regulation",
source_url="https://eur-lex.europa.eu/eli/reg/2022/868/oj/deu",
description="Weiterverwendung oeffentlicher Daten.",
celex="32022R0868",
requirement_count=35,
),
Regulation(
code="DSA",
name="Digital Services Act",
full_name="Verordnung (EU) 2022/2065",
regulation_type="eu_regulation",
source_url="https://eur-lex.europa.eu/eli/reg/2022/2065/oj/deu",
description="Digitale Dienste, Transparenzpflichten.",
celex="32022R2065",
requirement_count=93,
),
Regulation(
code="EAA",
name="European Accessibility Act",
full_name="Richtlinie (EU) 2019/882",
regulation_type="eu_directive",
source_url="https://eur-lex.europa.eu/eli/dir/2019/882/oj/deu",
description="Barrierefreiheit digitaler Produkte.",
celex="32019L0882",
requirement_count=25,
),
Regulation(
code="DSM",
name="DSM-Urheberrechtsrichtlinie",
full_name="Richtlinie (EU) 2019/790",
regulation_type="eu_directive",
source_url="https://eur-lex.europa.eu/eli/dir/2019/790/oj/deu",
description="Urheberrecht, Text- und Data-Mining.",
celex="32019L0790",
requirement_count=22,
),
Regulation(
code="PLD",
name="Produkthaftungsrichtlinie",
full_name="Richtlinie (EU) 2024/2853",
regulation_type="eu_directive",
source_url="https://eur-lex.europa.eu/eli/dir/2024/2853/oj/deu",
description="Produkthaftung inkl. Software und KI.",
celex="32024L2853",
requirement_count=18,
),
Regulation(
code="GPSR",
name="General Product Safety",
full_name="Verordnung (EU) 2023/988",
regulation_type="eu_regulation",
source_url="https://eur-lex.europa.eu/eli/reg/2023/988/oj/deu",
description="Allgemeine Produktsicherheit.",
celex="32023R0988",
requirement_count=30,
),
Regulation(
code="BSI-TR-03161-1",
name="BSI-TR-03161 Teil 1",
full_name="BSI Technische Richtlinie - Allgemeine Anforderungen",
regulation_type="bsi_standard",
source_url="https://www.bsi.bund.de/SharedDocs/Downloads/DE/BSI/Publikationen/TechnischeRichtlinien/TR03161/BSI-TR-03161-1.pdf?__blob=publicationFile&v=6",
description="Allgemeine Sicherheitsanforderungen (45 Pruefaspekte).",
requirement_count=45,
),
Regulation(
code="BSI-TR-03161-2",
name="BSI-TR-03161 Teil 2",
full_name="BSI Technische Richtlinie - Web-Anwendungen",
regulation_type="bsi_standard",
source_url="https://www.bsi.bund.de/SharedDocs/Downloads/DE/BSI/Publikationen/TechnischeRichtlinien/TR03161/BSI-TR-03161-2.pdf?__blob=publicationFile&v=5",
description="Web-Sicherheit (40 Pruefaspekte).",
requirement_count=40,
),
Regulation(
code="BSI-TR-03161-3",
name="BSI-TR-03161 Teil 3",
full_name="BSI Technische Richtlinie - Hintergrundsysteme",
regulation_type="bsi_standard",
source_url="https://www.bsi.bund.de/SharedDocs/Downloads/DE/BSI/Publikationen/TechnischeRichtlinien/TR03161/BSI-TR-03161-3.pdf?__blob=publicationFile&v=5",
description="Backend-Sicherheit (35 Pruefaspekte).",
requirement_count=35,
),
# Additional regulations for financial sector and health
Regulation(
code="DORA",
name="DORA",
full_name="Verordnung (EU) 2022/2554 - Digital Operational Resilience Act",
regulation_type="eu_regulation",
source_url="https://eur-lex.europa.eu/eli/reg/2022/2554/oj/deu",
description="Digitale operationale Resilienz fuer den Finanzsektor. IKT-Risikomanagement, Vorfallmeldung, Resilienz-Tests.",
celex="32022R2554",
requirement_count=64,
),
Regulation(
code="PSD2",
name="PSD2",
full_name="Richtlinie (EU) 2015/2366 - Zahlungsdiensterichtlinie",
regulation_type="eu_directive",
source_url="https://eur-lex.europa.eu/eli/dir/2015/2366/oj/deu",
description="Zahlungsdienste im Binnenmarkt. Starke Kundenauthentifizierung, Open Banking APIs.",
celex="32015L2366",
requirement_count=117,
),
Regulation(
code="AMLR",
name="AML-Verordnung",
full_name="Verordnung (EU) 2024/1624 - Geldwaeschebekaempfung",
regulation_type="eu_regulation",
source_url="https://eur-lex.europa.eu/eli/reg/2024/1624/oj/deu",
description="Verhinderung der Nutzung des Finanzsystems zur Geldwaesche und Terrorismusfinanzierung.",
celex="32024R1624",
requirement_count=89,
),
Regulation(
code="EHDS",
name="EHDS",
full_name="Verordnung (EU) 2025/327 - Europaeischer Gesundheitsdatenraum",
regulation_type="eu_regulation",
source_url="https://eur-lex.europa.eu/eli/reg/2025/327/oj/deu",
description="Europaeischer Raum fuer Gesundheitsdaten. Primaer- und Sekundaernutzung von Gesundheitsdaten.",
celex="32025R0327",
requirement_count=95,
),
Regulation(
code="MiCA",
name="MiCA",
full_name="Verordnung (EU) 2023/1114 - Markets in Crypto-Assets",
regulation_type="eu_regulation",
source_url="https://eur-lex.europa.eu/eli/reg/2023/1114/oj/deu",
description="Regulierung von Kryptowerten, Stablecoins und Crypto-Asset-Dienstleistern.",
celex="32023R1114",
requirement_count=149,
),
# =====================================================================
# DACH National Laws — Deutschland (P1)
# =====================================================================
Regulation(
code="DE_DDG",
name="Digitale-Dienste-Gesetz",
full_name="Digitale-Dienste-Gesetz (DDG)",
regulation_type="de_law",
source_url="https://www.gesetze-im-internet.de/ddg/",
description="Deutsches Umsetzungsgesetz zum DSA. Regelt Impressumspflicht (§5), Informationspflichten fuer digitale Dienste und Cookies.",
requirement_count=30,
),
Regulation(
code="DE_BGB_AGB",
name="BGB AGB-Recht",
full_name="BGB §§305-310, 312-312k — AGB und Fernabsatz",
regulation_type="de_law",
source_url="https://www.gesetze-im-internet.de/bgb/",
description="Deutsches AGB-Recht (§§305-310 BGB) und Fernabsatzrecht (§§312-312k BGB). Klauselverbote, Inhaltskontrolle, Widerrufsrecht, Button-Loesung.",
local_path="DE_BGB_AGB.txt",
requirement_count=40,
),
Regulation(
code="DE_EGBGB",
name="EGBGB Art. 246-248",
full_name="Einfuehrungsgesetz zum BGB — Informationspflichten",
regulation_type="de_law",
source_url="https://www.gesetze-im-internet.de/bgbeg/",
description="Informationspflichten bei Verbrauchervertraegen (Art. 246), Fernabsatz (Art. 246a), E-Commerce (Art. 246c).",
local_path="DE_EGBGB.txt",
requirement_count=20,
),
Regulation(
code="DE_UWG",
name="UWG Deutschland",
full_name="Gesetz gegen den unlauteren Wettbewerb (UWG)",
regulation_type="de_law",
source_url="https://www.gesetze-im-internet.de/uwg_2004/",
description="Unlauterer Wettbewerb: irrefuehrende Werbung, Spam-Verbot, Preisangaben, Online-Marketing-Regeln.",
requirement_count=25,
),
Regulation(
code="DE_HGB_RET",
name="HGB Aufbewahrung",
full_name="HGB §§238-261, 257 — Handelsbuecher und Aufbewahrungsfristen",
regulation_type="de_law",
source_url="https://www.gesetze-im-internet.de/hgb/",
description="Buchfuehrungspflicht, Aufbewahrungsfristen 6/10 Jahre, Anforderungen an elektronische Aufbewahrung.",
local_path="DE_HGB_RET.txt",
requirement_count=15,
),
Regulation(
code="DE_AO_RET",
name="AO Aufbewahrung",
full_name="Abgabenordnung §§140-148 — Steuerliche Aufbewahrungspflichten",
regulation_type="de_law",
source_url="https://www.gesetze-im-internet.de/ao_1977/",
description="Steuerliche Buchfuehrungs- und Aufbewahrungspflichten. 6/10 Jahre Fristen, Datenzugriff durch Finanzbehoerden.",
local_path="DE_AO_RET.txt",
requirement_count=12,
),
Regulation(
code="DE_TKG",
name="TKG 2021",
full_name="Telekommunikationsgesetz 2021",
regulation_type="de_law",
source_url="https://www.gesetze-im-internet.de/tkg_2021/",
description="Telekommunikationsregulierung: Kundenschutz, Datenschutz, Vertragslaufzeiten, Netzinfrastruktur.",
requirement_count=45,
),
# =====================================================================
# DACH National Laws — Oesterreich (P1)
# =====================================================================
Regulation(
code="AT_ECG",
name="E-Commerce-Gesetz AT",
full_name="E-Commerce-Gesetz (ECG) Oesterreich",
regulation_type="at_law",
source_url="https://www.ris.bka.gv.at/GeltendeFassung.wxe?Abfrage=Bundesnormen&Gesetzesnummer=20001703",
description="Oesterreichisches E-Commerce-Gesetz: Impressum/Offenlegungspflicht (§5), Informationspflichten, Haftung von Diensteanbietern.",
language="de",
requirement_count=30,
),
Regulation(
code="AT_TKG",
name="TKG 2021 AT",
full_name="Telekommunikationsgesetz 2021 Oesterreich",
regulation_type="at_law",
source_url="https://www.ris.bka.gv.at/GeltendeFassung.wxe?Abfrage=Bundesnormen&Gesetzesnummer=20011678",
description="Oesterreichisches TKG: Cookie-Bestimmungen (§165), Kommunikationsgeheimnis, Endgeraetezugriff.",
language="de",
requirement_count=40,
),
Regulation(
code="AT_KSCHG",
name="KSchG Oesterreich",
full_name="Konsumentenschutzgesetz (KSchG) Oesterreich",
regulation_type="at_law",
source_url="https://www.ris.bka.gv.at/GeltendeFassung.wxe?Abfrage=Bundesnormen&Gesetzesnummer=10002462",
description="Konsumentenschutz: AGB-Kontrolle (§6 Klauselverbote, §9 Verbandsklage), Ruecktrittsrecht, Informationspflichten.",
language="de",
requirement_count=35,
),
Regulation(
code="AT_FAGG",
name="FAGG Oesterreich",
full_name="Fern- und Auswaertsgeschaefte-Gesetz (FAGG) Oesterreich",
regulation_type="at_law",
source_url="https://www.ris.bka.gv.at/GeltendeFassung.wxe?Abfrage=Bundesnormen&Gesetzesnummer=20008847",
description="Fernabsatzrecht: Informationspflichten, Widerrufsrecht 14 Tage, Button-Loesung, Ausnahmen.",
language="de",
requirement_count=20,
),
Regulation(
code="AT_UGB_RET",
name="UGB Aufbewahrung AT",
full_name="UGB §§189-216, 212 — Rechnungslegung und Aufbewahrung Oesterreich",
regulation_type="at_law",
source_url="https://www.ris.bka.gv.at/GeltendeFassung.wxe?Abfrage=Bundesnormen&Gesetzesnummer=10001702",
description="Oesterreichische Rechnungslegungspflicht und Aufbewahrungsfristen (7 Jahre). Buchfuehrung, Jahresabschluss.",
local_path="AT_UGB_RET.txt",
language="de",
requirement_count=15,
),
Regulation(
code="AT_BAO_RET",
name="BAO §132 AT",
full_name="Bundesabgabenordnung §132 — Aufbewahrung Oesterreich",
regulation_type="at_law",
source_url="https://www.ris.bka.gv.at/GeltendeFassung.wxe?Abfrage=Bundesnormen&Gesetzesnummer=10003940",
description="Steuerliche Aufbewahrungspflicht 7 Jahre fuer Buecher, Aufzeichnungen und Belege. Grundstuecke 22 Jahre.",
language="de",
requirement_count=5,
),
Regulation(
code="AT_MEDIENG",
name="MedienG §§24-25 AT",
full_name="Mediengesetz §§24-25 Oesterreich — Impressum und Offenlegung",
regulation_type="at_law",
source_url="https://www.ris.bka.gv.at/GeltendeFassung.wxe?Abfrage=Bundesnormen&Gesetzesnummer=10000719",
description="Impressum/Offenlegungspflicht fuer periodische Medien und Websites in Oesterreich.",
language="de",
requirement_count=10,
),
# =====================================================================
# DACH National Laws — Schweiz (P1)
# =====================================================================
Regulation(
code="CH_DSV",
name="DSV Schweiz",
full_name="Datenschutzverordnung (DSV) Schweiz — SR 235.11",
regulation_type="ch_law",
source_url="https://www.fedlex.admin.ch/eli/cc/2022/568/de",
description="Ausfuehrungsverordnung zum revDSG: Meldepflichten, DSFA-Verfahren, Auslandtransfers, technische Massnahmen.",
language="de",
requirement_count=30,
),
Regulation(
code="CH_OR_AGB",
name="OR AGB/Aufbewahrung CH",
full_name="Obligationenrecht — AGB-Kontrolle und Aufbewahrung Schweiz (SR 220)",
regulation_type="ch_law",
source_url="https://www.fedlex.admin.ch/eli/cc/27/317_321_377/de",
description="Art. 8 OR (AGB-Inhaltskontrolle), Art. 19/20 (Vertragsfreiheit), Art. 957-958f (Buchfuehrung, 10 Jahre Aufbewahrung).",
local_path="CH_OR_AGB.txt",
language="de",
requirement_count=20,
),
Regulation(
code="CH_UWG",
name="UWG Schweiz",
full_name="Bundesgesetz gegen den unlauteren Wettbewerb Schweiz (SR 241)",
regulation_type="ch_law",
source_url="https://www.fedlex.admin.ch/eli/cc/1988/223_223_223/de",
description="Lauterkeitsrecht: Impressumspflicht, irrefuehrende Werbung, aggressive Verkaufsmethoden, AGB-Transparenz.",
language="de",
requirement_count=20,
),
Regulation(
code="CH_FMG",
name="FMG Schweiz",
full_name="Fernmeldegesetz Schweiz (SR 784.10)",
regulation_type="ch_law",
source_url="https://www.fedlex.admin.ch/eli/cc/1997/2187_2187_2187/de",
description="Telekommunikationsregulierung: Fernmeldegeheimnis, Cookies/Tracking (Art. 45c), Spam-Verbot, Datenschutz.",
language="de",
requirement_count=25,
),
# =====================================================================
# Deutschland P2
# =====================================================================
Regulation(
code="DE_PANGV",
name="PAngV",
full_name="Preisangabenverordnung (PAngV 2022)",
regulation_type="de_law",
source_url="https://www.gesetze-im-internet.de/pangv_2022/",
description="Preisangaben: Gesamtpreis, Grundpreis, Streichpreise (§11), Online-Preisauszeichnung.",
requirement_count=15,
),
Regulation(
code="DE_DLINFOV",
name="DL-InfoV",
full_name="Dienstleistungs-Informationspflichten-Verordnung",
regulation_type="de_law",
source_url="https://www.gesetze-im-internet.de/dlinfov/",
description="Informationspflichten fuer Dienstleister: Identitaet, Kontakt, Berufshaftpflicht, AGB-Zugang.",
requirement_count=10,
),
Regulation(
code="DE_BETRVG",
name="BetrVG §87",
full_name="Betriebsverfassungsgesetz §87 Abs.1 Nr.6",
regulation_type="de_law",
source_url="https://www.gesetze-im-internet.de/betrvg/",
description="Mitbestimmung bei technischer Ueberwachung: Betriebsrat-Beteiligung bei IT-Systemen, die Arbeitnehmerverhalten ueberwachen koennen.",
requirement_count=5,
),
# =====================================================================
# Oesterreich P2
# =====================================================================
Regulation(
code="AT_ABGB_AGB",
name="ABGB AGB-Recht AT",
full_name="ABGB §§861-879, 864a — AGB-Kontrolle Oesterreich",
regulation_type="at_law",
source_url="https://www.ris.bka.gv.at/GeltendeFassung.wxe?Abfrage=Bundesnormen&Gesetzesnummer=10001622",
description="Geltungskontrolle (§864a), Sittenwidrigkeitskontrolle (§879 Abs.3), allgemeine Vertragsregeln.",
local_path="AT_ABGB_AGB.txt",
language="de",
requirement_count=10,
),
Regulation(
code="AT_UWG",
name="UWG Oesterreich",
full_name="Bundesgesetz gegen den unlauteren Wettbewerb Oesterreich",
regulation_type="at_law",
source_url="https://www.ris.bka.gv.at/GeltendeFassung.wxe?Abfrage=Bundesnormen&Gesetzesnummer=10002665",
description="Lauterkeitsrecht AT: irrefuehrende Geschaeftspraktiken, aggressive Praktiken, Preisauszeichnung.",
language="de",
requirement_count=15,
),
# =====================================================================
# Schweiz P2
# =====================================================================
Regulation(
code="CH_GEBUV",
name="GeBuV Schweiz",
full_name="Geschaeftsbuecher-Verordnung Schweiz (SR 221.431)",
regulation_type="ch_law",
source_url="https://www.fedlex.admin.ch/eli/cc/2002/468_468_468/de",
description="Ausfuehrungsvorschriften zur Buchfuehrung: elektronische Aufbewahrung, Integritaet, Datentraeger.",
language="de",
requirement_count=10,
),
Regulation(
code="CH_ZERTES",
name="ZertES Schweiz",
full_name="Bundesgesetz ueber die elektronische Signatur (SR 943.03)",
regulation_type="ch_law",
source_url="https://www.fedlex.admin.ch/eli/cc/2016/752/de",
description="Elektronische Signatur und Zertifizierung: Qualifizierte Signaturen, Zertifizierungsdiensteanbieter.",
language="de",
requirement_count=10,
),
# =====================================================================
# Deutschland P3
# =====================================================================
Regulation(
code="DE_GESCHGEHG",
name="GeschGehG",
full_name="Gesetz zum Schutz von Geschaeftsgeheimnissen",
regulation_type="de_law",
source_url="https://www.gesetze-im-internet.de/geschgehg/",
description="Schutz von Geschaeftsgeheimnissen: Definition, angemessene Geheimhaltungsmassnahmen, Reverse Engineering.",
requirement_count=10,
),
Regulation(
code="DE_BSIG",
name="BSI-Gesetz",
full_name="Gesetz ueber das Bundesamt fuer Sicherheit in der Informationstechnik (BSIG)",
regulation_type="de_law",
source_url="https://www.gesetze-im-internet.de/bsig_2009/",
description="BSI-Aufgaben, KRITIS-Meldepflichten, IT-Sicherheitsstandards, Zertifizierung.",
requirement_count=20,
),
Regulation(
code="DE_USTG_RET",
name="UStG §14b",
full_name="Umsatzsteuergesetz §14b — Aufbewahrung von Rechnungen",
regulation_type="de_law",
source_url="https://www.gesetze-im-internet.de/ustg_1980/",
description="Aufbewahrungspflicht fuer Rechnungen: 10 Jahre, Grundstuecke 20 Jahre, elektronische Aufbewahrung.",
local_path="DE_USTG_RET.txt",
requirement_count=5,
),
# =====================================================================
# Schweiz P3
# =====================================================================
Regulation(
code="CH_ZGB_PERS",
name="ZGB Persoenlichkeitsschutz CH",
full_name="Zivilgesetzbuch Art. 28-28l — Persoenlichkeitsschutz Schweiz (SR 210)",
regulation_type="ch_law",
source_url="https://www.fedlex.admin.ch/eli/cc/24/233_245_233/de",
description="Persoenlichkeitsschutz: Recht am eigenen Bild, Schutz der Privatsphaere, Gegendarstellungsrecht.",
language="de",
requirement_count=8,
),
# =====================================================================
# 3 fehlgeschlagene Quellen mit alternativen URLs nachholen
# =====================================================================
Regulation(
code="LU_DPA_LAW",
name="Datenschutzgesetz Luxemburg",
full_name="Loi du 1er aout 2018 — Datenschutzgesetz Luxemburg",
regulation_type="national_law",
source_url="https://legilux.public.lu/eli/etat/leg/loi/2018/08/01/a686/jo",
description="Luxemburgisches Datenschutzgesetz: Organisation der CNPD, nationale DSGVO-Ergaenzung.",
language="fr",
requirement_count=40,
),
Regulation(
code="DK_DATABESKYTTELSESLOVEN",
name="Databeskyttelsesloven DK",
full_name="Databeskyttelsesloven — Datenschutzgesetz Daenemark",
regulation_type="national_law",
source_url="https://www.retsinformation.dk/eli/lta/2018/502",
description="Daenisches Datenschutzgesetz als ergaenzende Bestimmungen zur DSGVO. Reguliert durch Datatilsynet.",
language="da",
requirement_count=30,
),
Regulation(
code="EDPB_GUIDELINES_1_2022",
name="EDPB GL Bussgelder",
full_name="EDPB Leitlinien 04/2022 zur Berechnung von Bussgeldern nach der DSGVO",
regulation_type="eu_guideline",
source_url="https://www.edpb.europa.eu/system/files/2023-05/edpb_guidelines_042022_calculationofadministrativefines_en.pdf",
description="EDPB-Leitlinien zur Berechnung von Verwaltungsbussgeldern unter der DSGVO.",
language="en",
requirement_count=15,
),
]
class LegalCorpusIngestion:
"""Handles ingestion of legal documents into Qdrant."""
def __init__(self):
self.qdrant = QdrantClient(host=QDRANT_HOST, port=QDRANT_PORT)
self.http_client = httpx.AsyncClient(timeout=60.0)
self._ensure_collection()
def _ensure_collection(self):
"""Create the legal corpus collection if it doesn't exist."""
collections = self.qdrant.get_collections().collections
collection_names = [c.name for c in collections]
if LEGAL_CORPUS_COLLECTION not in collection_names:
logger.info(f"Creating collection: {LEGAL_CORPUS_COLLECTION}")
self.qdrant.create_collection(
collection_name=LEGAL_CORPUS_COLLECTION,
vectors_config=VectorParams(
size=VECTOR_SIZE,
distance=Distance.COSINE,
),
)
logger.info(f"Collection {LEGAL_CORPUS_COLLECTION} created")
else:
logger.info(f"Collection {LEGAL_CORPUS_COLLECTION} already exists")
async def _generate_embeddings(self, texts: List[str]) -> List[List[float]]:
"""Generate embeddings via the embedding service."""
try:
response = await self.http_client.post(
f"{EMBEDDING_SERVICE_URL}/embed",
json={"texts": texts},
timeout=120.0,
)
response.raise_for_status()
data = response.json()
return data["embeddings"]
except Exception as e:
logger.error(f"Embedding generation failed: {e}")
raise
# German abbreviations that don't end sentences
GERMAN_ABBREVIATIONS = {
'bzw', 'ca', 'chr', 'd.h', 'dr', 'etc', 'evtl', 'ggf', 'inkl', 'max',
'min', 'mio', 'mrd', 'nr', 'prof', 's', 'sog', 'u.a', 'u.ä', 'usw',
'v.a', 'vgl', 'vs', 'z.b', 'z.t', 'zzgl', 'abs', 'art', 'aufl',
'bd', 'betr', 'bzgl', 'dgl', 'ebd', 'hrsg', 'jg', 'kap', 'lt',
'rdnr', 'rn', 'std', 'str', 'tel', 'ua', 'uvm', 'va', 'zb',
'bsi', 'tr', 'owasp', 'iso', 'iec', 'din', 'en'
}
def _split_into_sentences(self, text: str) -> List[str]:
"""Split text into sentences with German language support."""
if not text:
return []
text = re.sub(r'\s+', ' ', text).strip()
# Protect abbreviations
protected_text = text
for abbrev in self.GERMAN_ABBREVIATIONS:
pattern = re.compile(r'\b' + re.escape(abbrev) + r'\.', re.IGNORECASE)
protected_text = pattern.sub(abbrev.replace('.', '<DOT>') + '<ABBR>', protected_text)
# Protect decimal/ordinal numbers and requirement IDs (e.g., "O.Data_1")
protected_text = re.sub(r'(\d)\.(\d)', r'\1<DECIMAL>\2', protected_text)
protected_text = re.sub(r'(\d+)\.(\s)', r'\1<ORD>\2', protected_text)
protected_text = re.sub(r'([A-Z])\.([A-Z])', r'\1<REQ>\2', protected_text) # O.Data_1
# Split on sentence endings
sentence_pattern = r'(?<=[.!?])\s+(?=[A-ZÄÖÜ0-9])|(?<=[.!?])$'
raw_sentences = re.split(sentence_pattern, protected_text)
# Restore protected characters
sentences = []
for s in raw_sentences:
s = s.replace('<DOT>', '.').replace('<ABBR>', '.').replace('<DECIMAL>', '.').replace('<ORD>', '.').replace('<REQ>', '.')
s = s.strip()
if s:
sentences.append(s)
return sentences
def _split_into_paragraphs(self, text: str) -> List[str]:
"""Split text into paragraphs."""
if not text:
return []
raw_paragraphs = re.split(r'\n\s*\n', text)
return [para.strip() for para in raw_paragraphs if para.strip()]
def _chunk_text_semantic(self, text: str, chunk_size: int = CHUNK_SIZE, overlap: int = CHUNK_OVERLAP) -> List[Tuple[str, int]]:
"""
Semantic chunking that respects paragraph and sentence boundaries.
Matches NIBIS chunking strategy for consistency.
Returns list of (chunk_text, start_position) tuples.
"""
if not text:
return []
if len(text) <= chunk_size:
return [(text.strip(), 0)]
paragraphs = self._split_into_paragraphs(text)
overlap_sentences = max(1, overlap // 100) # Convert char overlap to sentence overlap
chunks = []
current_chunk_parts = []
current_chunk_length = 0
chunk_start = 0
position = 0
for para in paragraphs:
if len(para) > chunk_size:
# Large paragraph: split into sentences
sentences = self._split_into_sentences(para)
for sentence in sentences:
sentence_len = len(sentence)
if sentence_len > chunk_size:
# Very long sentence: save current chunk first
if current_chunk_parts:
chunk_text = ' '.join(current_chunk_parts)
chunks.append((chunk_text, chunk_start))
overlap_buffer = current_chunk_parts[-overlap_sentences:] if overlap_sentences > 0 else []
current_chunk_parts = list(overlap_buffer)
current_chunk_length = sum(len(s) + 1 for s in current_chunk_parts)
# Add long sentence as its own chunk
chunks.append((sentence, position))
current_chunk_parts = [sentence]
current_chunk_length = len(sentence) + 1
position += sentence_len + 1
continue
if current_chunk_length + sentence_len + 1 > chunk_size and current_chunk_parts:
# Current chunk is full, save it
chunk_text = ' '.join(current_chunk_parts)
chunks.append((chunk_text, chunk_start))
overlap_buffer = current_chunk_parts[-overlap_sentences:] if overlap_sentences > 0 else []
current_chunk_parts = list(overlap_buffer)
current_chunk_length = sum(len(s) + 1 for s in current_chunk_parts)
chunk_start = position - current_chunk_length
current_chunk_parts.append(sentence)
current_chunk_length += sentence_len + 1
position += sentence_len + 1
else:
# Small paragraph: try to keep together
para_len = len(para)
if current_chunk_length + para_len + 2 > chunk_size and current_chunk_parts:
chunk_text = ' '.join(current_chunk_parts)
chunks.append((chunk_text, chunk_start))
last_para_sentences = self._split_into_sentences(current_chunk_parts[-1] if current_chunk_parts else "")
overlap_buffer = last_para_sentences[-overlap_sentences:] if overlap_sentences > 0 and last_para_sentences else []
current_chunk_parts = list(overlap_buffer)
current_chunk_length = sum(len(s) + 1 for s in current_chunk_parts)
chunk_start = position - current_chunk_length
if current_chunk_parts:
current_chunk_parts.append(para)
current_chunk_length += para_len + 2
else:
current_chunk_parts = [para]
current_chunk_length = para_len
chunk_start = position
position += para_len + 2
# Don't forget the last chunk
if current_chunk_parts:
chunk_text = ' '.join(current_chunk_parts)
chunks.append((chunk_text, chunk_start))
# Clean up whitespace
return [(re.sub(r'\s+', ' ', c).strip(), pos) for c, pos in chunks if c.strip()]
def _extract_article_info(self, text: str) -> Optional[Dict]:
"""Extract article number and paragraph from text."""
# Pattern for "Artikel X" or "Art. X"
article_match = re.search(r'(?:Artikel|Art\.?)\s+(\d+)', text)
paragraph_match = re.search(r'(?:Absatz|Abs\.?)\s+(\d+)', text)
if article_match:
return {
"article": article_match.group(1),
"paragraph": paragraph_match.group(1) if paragraph_match else None,
}
return None
async def _fetch_document_text(self, regulation: Regulation) -> Optional[str]:
"""
Fetch document text from local file or URL.
Priority:
1. Local file in docs/legal_corpus/ (.txt or .pdf)
2. EUR-Lex via CELEX URL (for EU regulations/directives)
3. Fallback to original source URL
"""
# Check for local file first
local_file = LEGAL_DOCS_PATH / f"{regulation.code}.txt"
if local_file.exists():
logger.info(f"Loading {regulation.code} from local file: {local_file}")
return local_file.read_text(encoding="utf-8")
local_pdf = LEGAL_DOCS_PATH / f"{regulation.code}.pdf"
if local_pdf.exists():
logger.info(f"Extracting text from PDF: {local_pdf}")
try:
# Use embedding service for PDF extraction
response = await self.http_client.post(
f"{EMBEDDING_SERVICE_URL}/extract-pdf",
files={"file": open(local_pdf, "rb")},
timeout=120.0,
)
response.raise_for_status()
data = response.json()
return data.get("text", "")
except Exception as e:
logger.error(f"PDF extraction failed for {regulation.code}: {e}")
# Try EUR-Lex CELEX URL if available (bypasses JavaScript CAPTCHA)
if regulation.celex:
celex_url = f"https://eur-lex.europa.eu/legal-content/DE/TXT/HTML/?uri=CELEX:{regulation.celex}"
logger.info(f"Fetching {regulation.code} from EUR-Lex CELEX: {celex_url}")
try:
response = await self.http_client.get(
celex_url,
follow_redirects=True,
headers={
"Accept": "text/html,application/xhtml+xml",
"Accept-Language": "de-DE,de;q=0.9",
"User-Agent": "Mozilla/5.0 (compatible; LegalCorpusIndexer/1.0)",
},
timeout=120.0,
)
response.raise_for_status()
html_content = response.text
# Check if we got actual content, not a CAPTCHA page
if "verify that you're not a robot" not in html_content and len(html_content) > 10000:
text = self._html_to_text(html_content)
if text and len(text) > 1000:
logger.info(f"Successfully fetched {regulation.code} via CELEX ({len(text)} chars)")
return text
else:
logger.warning(f"CELEX response too short for {regulation.code}, trying fallback")
else:
logger.warning(f"CELEX returned CAPTCHA for {regulation.code}, trying fallback")
except Exception as e:
logger.warning(f"CELEX fetch failed for {regulation.code}: {e}, trying fallback")
# Fallback to original source URL
logger.info(f"Fetching {regulation.code} from: {regulation.source_url}")
try:
# Check if source URL is a PDF (handle URLs with query parameters)
parsed_url = urlparse(regulation.source_url)
is_pdf_url = parsed_url.path.lower().endswith('.pdf')
if is_pdf_url:
logger.info(f"Downloading PDF from URL for {regulation.code}")
response = await self.http_client.get(
regulation.source_url,
follow_redirects=True,
headers={
"Accept": "application/pdf",
"User-Agent": "Mozilla/5.0 (compatible; LegalCorpusIndexer/1.0)",
},
timeout=180.0,
)
response.raise_for_status()
# Extract text from PDF via embedding service
pdf_content = response.content
extract_response = await self.http_client.post(
f"{EMBEDDING_SERVICE_URL}/extract-pdf",
files={"file": ("document.pdf", pdf_content, "application/pdf")},
timeout=180.0,
)
extract_response.raise_for_status()
data = extract_response.json()
text = data.get("text", "")
if text:
logger.info(f"Successfully extracted PDF text for {regulation.code} ({len(text)} chars)")
return text
else:
logger.warning(f"PDF extraction returned empty text for {regulation.code}")
return None
else:
# Regular HTML fetch
response = await self.http_client.get(
regulation.source_url,
follow_redirects=True,
headers={
"Accept": "text/html,application/xhtml+xml",
"Accept-Language": "de-DE,de;q=0.9",
"User-Agent": "Mozilla/5.0 (compatible; LegalCorpusIndexer/1.0)",
},
timeout=120.0,
)
response.raise_for_status()
text = self._html_to_text(response.text)
return text
except Exception as e:
logger.error(f"Failed to fetch {regulation.code}: {e}")
return None
def _html_to_text(self, html_content: str) -> str:
"""Convert HTML to clean text."""
# Remove script and style tags
html_content = re.sub(r'<script[^>]*>.*?</script>', '', html_content, flags=re.DOTALL)
html_content = re.sub(r'<style[^>]*>.*?</style>', '', html_content, flags=re.DOTALL)
# Remove comments
html_content = re.sub(r'<!--.*?-->', '', html_content, flags=re.DOTALL)
# Replace common HTML entities
html_content = html_content.replace('&nbsp;', ' ')
html_content = html_content.replace('&amp;', '&')
html_content = html_content.replace('&lt;', '<')
html_content = html_content.replace('&gt;', '>')
html_content = html_content.replace('&quot;', '"')
# Convert breaks and paragraphs to newlines for better chunking
html_content = re.sub(r'<br\s*/?>', '\n', html_content, flags=re.IGNORECASE)
html_content = re.sub(r'</p>', '\n\n', html_content, flags=re.IGNORECASE)
html_content = re.sub(r'</div>', '\n', html_content, flags=re.IGNORECASE)
html_content = re.sub(r'</h[1-6]>', '\n\n', html_content, flags=re.IGNORECASE)
# Remove remaining HTML tags
text = re.sub(r'<[^>]+>', ' ', html_content)
# Clean up whitespace (but preserve paragraph breaks)
text = re.sub(r'[ \t]+', ' ', text)
text = re.sub(r'\n[ \t]+', '\n', text)
text = re.sub(r'[ \t]+\n', '\n', text)
text = re.sub(r'\n{3,}', '\n\n', text)
return text.strip()
async def ingest_regulation(self, regulation: Regulation) -> int:
"""
Ingest a single regulation into Qdrant.
Returns number of chunks indexed.
"""
logger.info(f"Ingesting {regulation.code}: {regulation.name}")
# Fetch document text
text = await self._fetch_document_text(regulation)
if not text or len(text) < 100:
logger.warning(f"No text found for {regulation.code}, skipping")
return 0
# Chunk the text
chunks = self._chunk_text_semantic(text)
logger.info(f"Created {len(chunks)} chunks for {regulation.code}")
if not chunks:
return 0
# Generate embeddings in batches (very small for CPU stability)
batch_size = 4
all_points = []
max_retries = 3
for i in range(0, len(chunks), batch_size):
batch_chunks = chunks[i:i + batch_size]
chunk_texts = [c[0] for c in batch_chunks]
# Retry logic for embedding service stability
embeddings = None
for retry in range(max_retries):
try:
embeddings = await self._generate_embeddings(chunk_texts)
break
except Exception as e:
logger.warning(f"Embedding attempt {retry+1}/{max_retries} failed for batch {i//batch_size}: {e}")
if retry < max_retries - 1:
await asyncio.sleep(3 * (retry + 1)) # Longer backoff: 3s, 6s, 9s
else:
logger.error(f"Embedding failed permanently for batch {i//batch_size}")
if embeddings is None:
continue
# Longer delay between batches for CPU stability
await asyncio.sleep(1.5)
for j, ((chunk_text, position), embedding) in enumerate(zip(batch_chunks, embeddings)):
chunk_idx = i + j
point_id = hashlib.md5(f"{regulation.code}-{chunk_idx}".encode()).hexdigest()
# Extract article info if present
article_info = self._extract_article_info(chunk_text)
point = PointStruct(
id=point_id,
vector=embedding,
payload={
"text": chunk_text,
"regulation_code": regulation.code,
"regulation_name": regulation.name,
"regulation_full_name": regulation.full_name,
"regulation_type": regulation.regulation_type,
"source_url": regulation.source_url,
"chunk_index": chunk_idx,
"chunk_position": position,
"article": article_info.get("article") if article_info else None,
"paragraph": article_info.get("paragraph") if article_info else None,
"language": regulation.language,
"indexed_at": datetime.utcnow().isoformat(),
"training_allowed": False, # Legal texts - no training
},
)
all_points.append(point)
# Upsert to Qdrant
if all_points:
self.qdrant.upsert(
collection_name=LEGAL_CORPUS_COLLECTION,
points=all_points,
)
logger.info(f"Indexed {len(all_points)} chunks for {regulation.code}")
return len(all_points)
async def ingest_all(self) -> Dict[str, int]:
"""Ingest all regulations."""
results = {}
total = 0
for regulation in REGULATIONS:
try:
count = await self.ingest_regulation(regulation)
results[regulation.code] = count
total += count
except Exception as e:
logger.error(f"Failed to ingest {regulation.code}: {e}")
results[regulation.code] = 0
logger.info(f"Ingestion complete: {total} total chunks indexed")
return results
async def ingest_selected(self, codes: List[str]) -> Dict[str, int]:
"""Ingest selected regulations by code."""
results = {}
for code in codes:
regulation = next((r for r in REGULATIONS if r.code == code), None)
if not regulation:
logger.warning(f"Unknown regulation code: {code}")
results[code] = 0
continue
try:
count = await self.ingest_regulation(regulation)
results[code] = count
except Exception as e:
logger.error(f"Failed to ingest {code}: {e}")
results[code] = 0
return results
def get_status(self) -> Dict:
"""Get collection status and indexed regulations."""
try:
collection_info = self.qdrant.get_collection(LEGAL_CORPUS_COLLECTION)
# Count points per regulation
regulation_counts = {}
for reg in REGULATIONS:
result = self.qdrant.count(
collection_name=LEGAL_CORPUS_COLLECTION,
count_filter=Filter(
must=[
FieldCondition(
key="regulation_code",
match=MatchValue(value=reg.code),
)
]
),
)
regulation_counts[reg.code] = result.count
return {
"collection": LEGAL_CORPUS_COLLECTION,
"total_points": collection_info.points_count,
"vector_size": VECTOR_SIZE,
"regulations": regulation_counts,
"status": "ready" if collection_info.points_count > 0 else "empty",
}
except Exception as e:
return {
"collection": LEGAL_CORPUS_COLLECTION,
"error": str(e),
"status": "error",
}
async def search(
self,
query: str,
regulation_codes: Optional[List[str]] = None,
top_k: int = 5,
) -> List[Dict]:
"""
Search the legal corpus for relevant passages.
Args:
query: Search query text
regulation_codes: Optional list of regulation codes to filter
top_k: Number of results to return
Returns:
List of search results with text and metadata
"""
# Generate query embedding
embeddings = await self._generate_embeddings([query])
query_vector = embeddings[0]
# Build filter
search_filter = None
if regulation_codes:
search_filter = Filter(
should=[
FieldCondition(
key="regulation_code",
match=MatchValue(value=code),
)
for code in regulation_codes
]
)
# Search
results = self.qdrant.search(
collection_name=LEGAL_CORPUS_COLLECTION,
query_vector=query_vector,
query_filter=search_filter,
limit=top_k,
)
return [
{
"text": hit.payload.get("text"),
"regulation_code": hit.payload.get("regulation_code"),
"regulation_name": hit.payload.get("regulation_name"),
"article": hit.payload.get("article"),
"paragraph": hit.payload.get("paragraph"),
"source_url": hit.payload.get("source_url"),
"score": hit.score,
}
for hit in results
]
async def close(self):
"""Close HTTP client."""
await self.http_client.aclose()
async def main():
"""CLI entry point."""
import argparse
parser = argparse.ArgumentParser(description="Legal Corpus Ingestion for UCCA")
parser.add_argument("--ingest-all", action="store_true", help="Ingest all regulations")
parser.add_argument("--ingest", nargs="+", metavar="CODE", help="Ingest specific regulations by code")
parser.add_argument("--status", action="store_true", help="Show collection status")
parser.add_argument("--search", type=str, help="Test search query")
args = parser.parse_args()
ingestion = LegalCorpusIngestion()
try:
if args.status:
status = ingestion.get_status()
print(json.dumps(status, indent=2))
elif args.ingest_all:
print(f"Ingesting all {len(REGULATIONS)} regulations...")
results = await ingestion.ingest_all()
print("\nResults:")
for code, count in results.items():
print(f" {code}: {count} chunks")
print(f"\nTotal: {sum(results.values())} chunks")
elif args.ingest:
print(f"Ingesting: {', '.join(args.ingest)}")
results = await ingestion.ingest_selected(args.ingest)
print("\nResults:")
for code, count in results.items():
print(f" {code}: {count} chunks")
elif args.search:
print(f"Searching: {args.search}")
results = await ingestion.search(args.search)
print(f"\nFound {len(results)} results:")
for i, result in enumerate(results, 1):
print(f"\n{i}. [{result['regulation_code']}] Score: {result['score']:.3f}")
if result.get('article'):
print(f" Art. {result['article']}" + (f" Abs. {result['paragraph']}" if result.get('paragraph') else ""))
print(f" {result['text'][:200]}...")
else:
parser.print_help()
finally:
await ingestion.close()
if __name__ == "__main__":
asyncio.run(main())