Files
breakpilot-compliance/backend-compliance/compliance/api/source_policy_router.py
Sharang Parnerkar 7107a31496 refactor(backend/api): extract SourcePolicyService (Step 4 — file 7 of 18)
compliance/api/source_policy_router.py (580 LOC) -> 253 LOC thin routes
+ 453-line SourcePolicyService + 83-line schemas file. Manages allowed
data sources, operations matrix, PII rules, blocked-content log,
audit trail, and dashboard stats/report.

Single-service split. ORM-based (uses compliance.db.source_policy_models).

Date-string parsing extracted to a module-level _parse_iso_optional
helper so the audit + blocked-content list endpoints share it instead
of duplicating try/except blocks.

Legacy test compat: SourceCreate, SourceUpdate, SourceResponse,
PIIRuleCreate, PIIRuleUpdate, OperationUpdate, _log_audit re-exported
from compliance.api.source_policy_router via __all__.

Verified:
  - 208/208 pytest pass (173 core + 35 source policy)
  - OpenAPI 360/484 unchanged
  - mypy compliance/ -> Success on 132 source files
  - source_policy_router.py 580 -> 253 LOC
  - Hard-cap violations: 12 -> 11

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-07 19:58:02 +02:00

254 lines
8.4 KiB
Python

"""
Source Policy Router — Manages allowed compliance data sources.
Controls which legal sources the RAG corpus may use, the operations matrix,
PII rules, blocked-content log, audit trail, and dashboard stats/report.
Endpoints:
GET /v1/admin/sources - List all sources
POST /v1/admin/sources - Add new source
GET /v1/admin/sources/{id} - Get source by ID
PUT /v1/admin/sources/{id} - Update source
DELETE /v1/admin/sources/{id} - Remove source
GET /v1/admin/operations-matrix - Operations matrix
PUT /v1/admin/operations/{id} - Update operation
GET /v1/admin/pii-rules - List PII rules
POST /v1/admin/pii-rules - Create PII rule
PUT /v1/admin/pii-rules/{id} - Update PII rule
DELETE /v1/admin/pii-rules/{id} - Delete PII rule
GET /v1/admin/blocked-content - Blocked content log
GET /v1/admin/policy-audit - Audit trail
GET /v1/admin/policy-stats - Dashboard statistics
GET /v1/admin/compliance-report - Compliance report
Phase 1 Step 4 refactor: handlers delegate to SourcePolicyService.
"""
from typing import Any, Optional
from fastapi import APIRouter, Depends, Query
from sqlalchemy.orm import Session
from database import get_db
from compliance.api._http_errors import translate_domain_errors
from compliance.schemas.source_policy import (
OperationUpdate,
PIIRuleCreate,
PIIRuleUpdate,
SourceCreate,
SourceResponse,
SourceUpdate,
)
from compliance.services.source_policy_service import (
SourcePolicyService,
_log_audit, # re-exported for legacy test imports
)
router = APIRouter(prefix="/v1/admin", tags=["source-policy"])
def get_source_policy_service(
db: Session = Depends(get_db),
) -> SourcePolicyService:
return SourcePolicyService(db)
# =============================================================================
# Sources CRUD
# =============================================================================
@router.get("/sources")
async def list_sources(
active_only: bool = Query(False),
source_type: Optional[str] = Query(None),
license: Optional[str] = Query(None),
service: SourcePolicyService = Depends(get_source_policy_service),
) -> dict[str, Any]:
"""List all allowed sources with optional filters."""
with translate_domain_errors():
return service.list_sources(active_only, source_type, license)
@router.post("/sources")
async def create_source(
data: SourceCreate,
service: SourcePolicyService = Depends(get_source_policy_service),
) -> dict[str, Any]:
"""Add a new allowed source."""
with translate_domain_errors():
return service.create_source(data)
@router.get("/sources/{source_id}")
async def get_source(
source_id: str,
service: SourcePolicyService = Depends(get_source_policy_service),
) -> dict[str, Any]:
"""Get a specific source."""
with translate_domain_errors():
return service.get_source(source_id)
@router.put("/sources/{source_id}")
async def update_source(
source_id: str,
data: SourceUpdate,
service: SourcePolicyService = Depends(get_source_policy_service),
) -> dict[str, Any]:
"""Update an existing source."""
with translate_domain_errors():
return service.update_source(source_id, data)
@router.delete("/sources/{source_id}")
async def delete_source(
source_id: str,
service: SourcePolicyService = Depends(get_source_policy_service),
) -> dict[str, Any]:
"""Remove an allowed source."""
with translate_domain_errors():
return service.delete_source(source_id)
# =============================================================================
# Operations Matrix
# =============================================================================
@router.get("/operations-matrix")
async def get_operations_matrix(
service: SourcePolicyService = Depends(get_source_policy_service),
) -> dict[str, Any]:
"""Get the full operations matrix."""
with translate_domain_errors():
return service.get_operations_matrix()
@router.put("/operations/{operation_id}")
async def update_operation(
operation_id: str,
data: OperationUpdate,
service: SourcePolicyService = Depends(get_source_policy_service),
) -> dict[str, Any]:
"""Update an operation in the matrix."""
with translate_domain_errors():
return service.update_operation(operation_id, data)
# =============================================================================
# PII Rules
# =============================================================================
@router.get("/pii-rules")
async def list_pii_rules(
category: Optional[str] = Query(None),
service: SourcePolicyService = Depends(get_source_policy_service),
) -> dict[str, Any]:
"""List all PII rules with optional category filter."""
with translate_domain_errors():
return service.list_pii_rules(category)
@router.post("/pii-rules")
async def create_pii_rule(
data: PIIRuleCreate,
service: SourcePolicyService = Depends(get_source_policy_service),
) -> dict[str, Any]:
"""Create a new PII rule."""
with translate_domain_errors():
return service.create_pii_rule(data)
@router.put("/pii-rules/{rule_id}")
async def update_pii_rule(
rule_id: str,
data: PIIRuleUpdate,
service: SourcePolicyService = Depends(get_source_policy_service),
) -> dict[str, Any]:
"""Update a PII rule."""
with translate_domain_errors():
return service.update_pii_rule(rule_id, data)
@router.delete("/pii-rules/{rule_id}")
async def delete_pii_rule(
rule_id: str,
service: SourcePolicyService = Depends(get_source_policy_service),
) -> dict[str, Any]:
"""Delete a PII rule."""
with translate_domain_errors():
return service.delete_pii_rule(rule_id)
# =============================================================================
# Blocked Content
# =============================================================================
@router.get("/blocked-content")
async def list_blocked_content(
limit: int = Query(50, ge=1, le=500),
offset: int = Query(0, ge=0),
domain: Optional[str] = None,
date_from: Optional[str] = Query(None, alias="from"),
date_to: Optional[str] = Query(None, alias="to"),
service: SourcePolicyService = Depends(get_source_policy_service),
) -> dict[str, Any]:
"""List blocked content entries."""
with translate_domain_errors():
return service.list_blocked_content(limit, offset, domain, date_from, date_to)
# =============================================================================
# Audit Trail
# =============================================================================
@router.get("/policy-audit")
async def get_policy_audit(
limit: int = Query(50, ge=1, le=500),
offset: int = Query(0, ge=0),
entity_type: Optional[str] = None,
date_from: Optional[str] = Query(None, alias="from"),
date_to: Optional[str] = Query(None, alias="to"),
service: SourcePolicyService = Depends(get_source_policy_service),
) -> dict[str, Any]:
"""Get the audit trail for source policy changes."""
with translate_domain_errors():
return service.get_audit(limit, offset, entity_type, date_from, date_to)
# =============================================================================
# Dashboard Statistics + Report
# =============================================================================
@router.get("/policy-stats")
async def get_policy_stats(
service: SourcePolicyService = Depends(get_source_policy_service),
) -> dict[str, Any]:
"""Get dashboard statistics for source policy."""
with translate_domain_errors():
return service.stats()
@router.get("/compliance-report")
async def get_compliance_report(
service: SourcePolicyService = Depends(get_source_policy_service),
) -> dict[str, Any]:
"""Generate a compliance report for source policies."""
with translate_domain_errors():
return service.compliance_report()
# ----------------------------------------------------------------------------
# Legacy re-exports for tests that import schemas/helpers directly.
# ----------------------------------------------------------------------------
__all__ = [
"router",
"SourceCreate",
"SourceUpdate",
"SourceResponse",
"OperationUpdate",
"PIIRuleCreate",
"PIIRuleUpdate",
"_log_audit",
]