fix: replace Python 3.10+ union type syntax with typing.Optional for Pydantic v2 compat
Some checks failed
CI/CD / go-lint (push) Has been skipped
CI/CD / python-lint (push) Has been skipped
CI/CD / nodejs-lint (push) Has been skipped
CI/CD / test-go-ai-compliance (push) Successful in 37s
CI/CD / test-python-backend-compliance (push) Successful in 35s
CI/CD / test-python-document-crawler (push) Successful in 24s
CI/CD / test-python-dsms-gateway (push) Successful in 19s
CI/CD / validate-canonical-controls (push) Successful in 12s
CI/CD / deploy-hetzner (push) Has been cancelled

from __future__ import annotations breaks Pydantic BaseModel runtime type
evaluation. Replaced str | None → Optional[str], list[str] → List[str] etc.
in control_generator.py, anchor_finder.py, control_generator_routes.py.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-03-13 09:36:14 +01:00
parent cdafc4d9f4
commit c530898963
3 changed files with 29 additions and 34 deletions

View File

@@ -12,11 +12,9 @@ Endpoints:
POST /v1/canonical/blocked-sources/cleanup — Start cleanup workflow POST /v1/canonical/blocked-sources/cleanup — Start cleanup workflow
""" """
from __future__ import annotations
import json import json
import logging import logging
from typing import Optional from typing import Optional, List
from fastapi import APIRouter, HTTPException, Query from fastapi import APIRouter, HTTPException, Query
from pydantic import BaseModel from pydantic import BaseModel
@@ -39,8 +37,8 @@ router = APIRouter(prefix="/v1/canonical", tags=["control-generator"])
# ============================================================================= # =============================================================================
class GenerateRequest(BaseModel): class GenerateRequest(BaseModel):
domain: str | None = None domain: Optional[str] = None
collections: list[str] | None = None collections: Optional[List[str]] = None
max_controls: int = 50 max_controls: int = 50
batch_size: int = 5 batch_size: int = 5
skip_web_search: bool = False skip_web_search: bool = False
@@ -63,8 +61,8 @@ class GenerateResponse(BaseModel):
class ReviewRequest(BaseModel): class ReviewRequest(BaseModel):
action: str # "approve", "reject", "needs_rework" action: str # "approve", "reject", "needs_rework"
release_state: str | None = None # Override release_state release_state: Optional[str] = None # Override release_state
notes: str | None = None notes: Optional[str] = None
class ProcessedStats(BaseModel): class ProcessedStats(BaseModel):
@@ -83,7 +81,7 @@ class BlockedSourceResponse(BaseModel):
document_title: str document_title: str
reason: str reason: str
deletion_status: str deletion_status: str
qdrant_collection: str | None = None qdrant_collection: Optional[str] = None
marked_at: str marked_at: str
@@ -367,8 +365,8 @@ async def start_cleanup():
@router.get("/controls-customer") @router.get("/controls-customer")
async def get_controls_customer_view( async def get_controls_customer_view(
severity: str | None = Query(None), severity: Optional[str] = Query(None),
domain: str | None = Query(None), domain: Optional[str] = Query(None),
): ):
"""Get controls filtered for customer visibility. """Get controls filtered for customer visibility.

View File

@@ -8,10 +8,9 @@ Two-stage search:
Only open-source references (Rule 1+2) are accepted as anchors. Only open-source references (Rule 1+2) are accepted as anchors.
""" """
from __future__ import annotations
import logging import logging
from dataclasses import dataclass from dataclasses import dataclass
from typing import List, Optional
import httpx import httpx
@@ -40,7 +39,7 @@ class OpenAnchor:
class AnchorFinder: class AnchorFinder:
"""Finds open-source references to anchor generated controls.""" """Finds open-source references to anchor generated controls."""
def __init__(self, rag_client: ComplianceRAGClient | None = None): def __init__(self, rag_client: Optional[ComplianceRAGClient] = None):
self.rag = rag_client or get_rag_client() self.rag = rag_client or get_rag_client()
async def find_anchors( async def find_anchors(
@@ -48,7 +47,7 @@ class AnchorFinder:
control: GeneratedControl, control: GeneratedControl,
skip_web: bool = False, skip_web: bool = False,
min_anchors: int = 2, min_anchors: int = 2,
) -> list[OpenAnchor]: ) -> List[OpenAnchor]:
"""Find open-source anchors for a control.""" """Find open-source anchors for a control."""
# Stage A: RAG-internal search # Stage A: RAG-internal search
anchors = await self._search_rag_for_open_anchors(control) anchors = await self._search_rag_for_open_anchors(control)
@@ -64,7 +63,7 @@ class AnchorFinder:
return anchors return anchors
async def _search_rag_for_open_anchors(self, control: GeneratedControl) -> list[OpenAnchor]: async def _search_rag_for_open_anchors(self, control: GeneratedControl) -> List[OpenAnchor]:
"""Search RAG for chunks from open sources matching the control topic.""" """Search RAG for chunks from open sources matching the control topic."""
# Build search query from control title + first 3 tags # Build search query from control title + first 3 tags
tags_str = " ".join(control.tags[:3]) if control.tags else "" tags_str = " ".join(control.tags[:3]) if control.tags else ""
@@ -76,7 +75,7 @@ class AnchorFinder:
top_k=15, top_k=15,
) )
anchors: list[OpenAnchor] = [] anchors: List[OpenAnchor] = []
seen: set[str] = set() seen: set[str] = set()
for r in results: for r in results:
@@ -109,7 +108,7 @@ class AnchorFinder:
return anchors return anchors
async def _search_web(self, control: GeneratedControl) -> list[OpenAnchor]: async def _search_web(self, control: GeneratedControl) -> List[OpenAnchor]:
"""Search DuckDuckGo Instant Answer API for open references.""" """Search DuckDuckGo Instant Answer API for open references."""
keywords = f"{control.title} security control OWASP NIST" keywords = f"{control.title} security control OWASP NIST"
try: try:
@@ -127,7 +126,7 @@ class AnchorFinder:
return [] return []
data = resp.json() data = resp.json()
anchors: list[OpenAnchor] = [] anchors: List[OpenAnchor] = []
# Parse RelatedTopics # Parse RelatedTopics
for topic in data.get("RelatedTopics", [])[:10]: for topic in data.get("RelatedTopics", [])[:10]:
@@ -156,7 +155,7 @@ class AnchorFinder:
return [] return []
@staticmethod @staticmethod
def _identify_framework_from_url(url: str) -> str | None: def _identify_framework_from_url(url: str) -> Optional[str]:
"""Identify if a URL belongs to a known open-source framework.""" """Identify if a URL belongs to a known open-source framework."""
url_lower = url.lower() url_lower = url.lower()
if "owasp.org" in url_lower: if "owasp.org" in url_lower:

View File

@@ -17,8 +17,6 @@ Three License Rules:
Rule 3 (restricted): BSI, ISO — full reformulation, no source names Rule 3 (restricted): BSI, ISO — full reformulation, no source names
""" """
from __future__ import annotations
import hashlib import hashlib
import json import json
import logging import logging
@@ -27,7 +25,7 @@ import re
import uuid import uuid
from dataclasses import dataclass, field, asdict from dataclasses import dataclass, field, asdict
from datetime import datetime, timezone from datetime import datetime, timezone
from typing import Optional from typing import Dict, List, Optional, Set
import httpx import httpx
from pydantic import BaseModel from pydantic import BaseModel
@@ -168,8 +166,8 @@ def _detect_domain(text: str) -> str:
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
class GeneratorConfig(BaseModel): class GeneratorConfig(BaseModel):
collections: list[str] | None = None collections: Optional[List[str]] = None
domain: str | None = None domain: Optional[str] = None
batch_size: int = 5 batch_size: int = 5
max_controls: int = 50 max_controls: int = 50
skip_processed: bool = True skip_processed: bool = True
@@ -194,9 +192,9 @@ class GeneratedControl:
release_state: str = "draft" release_state: str = "draft"
tags: list = field(default_factory=list) tags: list = field(default_factory=list)
# 3-rule fields # 3-rule fields
license_rule: int | None = None license_rule: Optional[int] = None
source_original_text: str | None = None source_original_text: Optional[str] = None
source_citation: dict | None = None source_citation: Optional[dict] = None
customer_visible: bool = True customer_visible: bool = True
generation_metadata: dict = field(default_factory=dict) generation_metadata: dict = field(default_factory=dict)
@@ -219,7 +217,7 @@ class GeneratorResult:
# LLM Client (via Go SDK) # LLM Client (via Go SDK)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
async def _llm_chat(prompt: str, system_prompt: str | None = None) -> str: async def _llm_chat(prompt: str, system_prompt: Optional[str] = None) -> str:
"""Call the Go SDK LLM chat endpoint.""" """Call the Go SDK LLM chat endpoint."""
messages = [] messages = []
if system_prompt: if system_prompt:
@@ -322,11 +320,11 @@ Antworte NUR mit validem JSON."""
class ControlGeneratorPipeline: class ControlGeneratorPipeline:
"""Orchestrates the 7-stage control generation pipeline.""" """Orchestrates the 7-stage control generation pipeline."""
def __init__(self, db: Session, rag_client: ComplianceRAGClient | None = None): def __init__(self, db: Session, rag_client: Optional[ComplianceRAGClient] = None):
self.db = db self.db = db
self.rag = rag_client or get_rag_client() self.rag = rag_client or get_rag_client()
self._existing_controls: list[dict] | None = None self._existing_controls: Optional[List[dict]] = None
self._existing_embeddings: dict[str, list[float]] = {} self._existing_embeddings: Dict[str, List[float]] = {}
# ── Stage 1: RAG Scan ────────────────────────────────────────────── # ── Stage 1: RAG Scan ──────────────────────────────────────────────
@@ -537,7 +535,7 @@ Gib JSON zurück mit diesen Feldern:
# ── Stage 4: Harmonization ───────────────────────────────────────── # ── Stage 4: Harmonization ─────────────────────────────────────────
async def _check_harmonization(self, new_control: GeneratedControl) -> list | None: async def _check_harmonization(self, new_control: GeneratedControl) -> Optional[list]:
"""Check if a new control duplicates existing ones via embedding similarity.""" """Check if a new control duplicates existing ones via embedding similarity."""
existing = self._load_existing_controls() existing = self._load_existing_controls()
if not existing: if not existing:
@@ -698,7 +696,7 @@ Gib JSON zurück mit diesen Feldern:
except Exception as e: except Exception as e:
logger.error("Failed to update job: %s", e) logger.error("Failed to update job: %s", e)
def _store_control(self, control: GeneratedControl, job_id: str) -> str | None: def _store_control(self, control: GeneratedControl, job_id: str) -> Optional[str]:
"""Persist a generated control to DB. Returns the control UUID or None.""" """Persist a generated control to DB. Returns the control UUID or None."""
try: try:
# Get framework UUID # Get framework UUID
@@ -889,7 +887,7 @@ Gib JSON zurück mit diesen Feldern:
chunk: RAGSearchResult, chunk: RAGSearchResult,
config: GeneratorConfig, config: GeneratorConfig,
job_id: str, job_id: str,
) -> GeneratedControl | None: ) -> Optional[GeneratedControl]:
"""Process a single chunk through stages 2-5.""" """Process a single chunk through stages 2-5."""
# Stage 2: License classification # Stage 2: License classification
license_info = self._classify_license(chunk) license_info = self._classify_license(chunk)