Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-ai-compliance (push) Failing after 30s
CI / test-python-backend-compliance (push) Successful in 30s
CI / test-python-document-crawler (push) Successful in 21s
CI / test-python-dsms-gateway (push) Successful in 17s
- Ruff: 144 auto-fixes (unused imports, == None → is None), F821/F811/F841 manuell - CVEs: python-multipart>=0.0.22, weasyprint>=68.0, pillow>=12.1.1, npm audit fix (0 vulns) - TS: 5 tote Drafting-Engine-Dateien entfernt, allowed-facts/sanitizer/StepHeader/context fixes - Tests: +104 (ISMS 58, Evidence 18, VVT 14, Generation 14) → 1449 passed - Refactoring: collect_ci_evidence (F→A), row_to_response (E→A), extract_requirements (E→A) - Dead Code: pca-platform, 7 Go-Handler, dsr_api.py, duplicate Schemas entfernt Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
208 lines
6.5 KiB
Python
208 lines
6.5 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Validate service modules configuration.
|
|
|
|
Checks:
|
|
- All services have required fields
|
|
- Port conflicts
|
|
- Technology stack completeness
|
|
- Regulation mappings validity
|
|
"""
|
|
|
|
import sys
|
|
from pathlib import Path
|
|
from collections import defaultdict
|
|
from typing import Dict, List
|
|
|
|
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
|
|
|
|
from compliance.data.service_modules import (
|
|
BREAKPILOT_SERVICES,
|
|
get_service_count,
|
|
get_services_by_type,
|
|
get_services_processing_pii,
|
|
get_services_with_ai,
|
|
get_critical_services
|
|
)
|
|
from compliance.data.regulations import REGULATIONS_SEED
|
|
|
|
# Color codes for output
|
|
GREEN = '\033[92m'
|
|
YELLOW = '\033[93m'
|
|
RED = '\033[91m'
|
|
RESET = '\033[0m'
|
|
|
|
|
|
def validate_required_fields():
|
|
"""Validate that all services have required fields."""
|
|
print(f"\n{GREEN}=== Validating Required Fields ==={RESET}")
|
|
errors = []
|
|
|
|
required_fields = ["name", "display_name", "service_type", "technology_stack"]
|
|
|
|
for service in BREAKPILOT_SERVICES:
|
|
for field in required_fields:
|
|
if field not in service or not service[field]:
|
|
errors.append(f" {RED}✗{RESET} Service '{service.get('name', 'UNKNOWN')}' missing required field: {field}")
|
|
|
|
if errors:
|
|
print(f"\n{RED}Found {len(errors)} errors:{RESET}")
|
|
for error in errors:
|
|
print(error)
|
|
return False
|
|
else:
|
|
print(f"{GREEN}✓ All services have required fields{RESET}")
|
|
return True
|
|
|
|
|
|
def validate_port_conflicts():
|
|
"""Check for port conflicts."""
|
|
print(f"\n{GREEN}=== Checking Port Conflicts ==={RESET}")
|
|
port_map: Dict[int, List[str]] = defaultdict(list)
|
|
warnings = []
|
|
|
|
for service in BREAKPILOT_SERVICES:
|
|
port = service.get("port")
|
|
if port:
|
|
port_map[port].append(service["name"])
|
|
|
|
for port, services in port_map.items():
|
|
if len(services) > 1:
|
|
warnings.append(f" {YELLOW}⚠{RESET} Port {port} used by multiple services: {', '.join(services)}")
|
|
|
|
if warnings:
|
|
print(f"\n{YELLOW}Found {len(warnings)} port conflicts:{RESET}")
|
|
for warning in warnings:
|
|
print(warning)
|
|
return False
|
|
else:
|
|
print(f"{GREEN}✓ No port conflicts detected{RESET}")
|
|
return True
|
|
|
|
|
|
def validate_regulation_mappings():
|
|
"""Validate that all regulation codes exist."""
|
|
print(f"\n{GREEN}=== Validating Regulation Mappings ==={RESET}")
|
|
valid_regulation_codes = {reg["code"] for reg in REGULATIONS_SEED}
|
|
errors = []
|
|
|
|
for service in BREAKPILOT_SERVICES:
|
|
regulations = service.get("regulations", [])
|
|
for reg_mapping in regulations:
|
|
reg_code = reg_mapping.get("code")
|
|
if reg_code not in valid_regulation_codes:
|
|
errors.append(
|
|
f" {RED}✗{RESET} Service '{service['name']}' references unknown regulation: {reg_code}"
|
|
)
|
|
|
|
# Check for relevance field
|
|
if "relevance" not in reg_mapping:
|
|
errors.append(
|
|
f" {RED}✗{RESET} Service '{service['name']}' regulation mapping for {reg_code} missing 'relevance'"
|
|
)
|
|
|
|
if errors:
|
|
print(f"\n{RED}Found {len(errors)} regulation mapping errors:{RESET}")
|
|
for error in errors:
|
|
print(error)
|
|
return False
|
|
else:
|
|
print(f"{GREEN}✓ All regulation mappings are valid{RESET}")
|
|
return True
|
|
|
|
|
|
def print_statistics():
|
|
"""Print statistics about the service registry."""
|
|
print(f"\n{GREEN}=== Service Module Statistics ==={RESET}")
|
|
|
|
total = get_service_count()
|
|
print(f"\nTotal Services: {total}")
|
|
|
|
# By type
|
|
print(f"\n{GREEN}By Type:{RESET}")
|
|
service_types = ["backend", "database", "ai", "communication", "storage", "infrastructure", "monitoring", "security"]
|
|
for stype in service_types:
|
|
count = len(get_services_by_type(stype))
|
|
if count > 0:
|
|
print(f" - {stype.capitalize()}: {count}")
|
|
|
|
# PII processing
|
|
pii_count = len(get_services_processing_pii())
|
|
print(f"\n{GREEN}Data Processing:{RESET}")
|
|
print(f" - Processing PII: {pii_count}")
|
|
print(f" - AI Components: {len(get_services_with_ai())}")
|
|
|
|
# Criticality
|
|
critical_count = len(get_critical_services())
|
|
print(f"\n{GREEN}Criticality:{RESET}")
|
|
print(f" - Critical Services: {critical_count}")
|
|
|
|
# Ports
|
|
services_with_ports = [s for s in BREAKPILOT_SERVICES if s.get("port")]
|
|
print(f"\n{GREEN}Network:{RESET}")
|
|
print(f" - Services with exposed ports: {len(services_with_ports)}")
|
|
|
|
# Docker images
|
|
docker_services = [s for s in BREAKPILOT_SERVICES if s.get("docker_image")]
|
|
print(f"\n{GREEN}Docker:{RESET}")
|
|
print(f" - Services with Docker images: {len(docker_services)}")
|
|
|
|
# Most common technologies
|
|
tech_count: Dict[str, int] = defaultdict(int)
|
|
for service in BREAKPILOT_SERVICES:
|
|
for tech in service.get("technology_stack", []):
|
|
tech_count[tech] += 1
|
|
|
|
print(f"\n{GREEN}Top Technologies:{RESET}")
|
|
for tech, count in sorted(tech_count.items(), key=lambda x: x[1], reverse=True)[:10]:
|
|
print(f" - {tech}: {count}")
|
|
|
|
|
|
def validate_data_categories():
|
|
"""Check that PII-processing services have data_categories defined."""
|
|
print(f"\n{GREEN}=== Validating Data Categories ==={RESET}")
|
|
warnings = []
|
|
|
|
for service in BREAKPILOT_SERVICES:
|
|
if service.get("processes_pii") and not service.get("data_categories"):
|
|
warnings.append(
|
|
f" {YELLOW}⚠{RESET} Service '{service['name']}' processes PII but has no data_categories defined"
|
|
)
|
|
|
|
if warnings:
|
|
print(f"\n{YELLOW}Found {len(warnings)} warnings:{RESET}")
|
|
for warning in warnings:
|
|
print(warning)
|
|
return False
|
|
else:
|
|
print(f"{GREEN}✓ All PII-processing services have data categories{RESET}")
|
|
return True
|
|
|
|
|
|
def main():
|
|
"""Run all validations."""
|
|
print(f"{GREEN}{'='*60}")
|
|
print(" Breakpilot Service Module Validation")
|
|
print(f"{'='*60}{RESET}")
|
|
|
|
all_passed = True
|
|
|
|
all_passed &= validate_required_fields()
|
|
all_passed &= validate_port_conflicts()
|
|
all_passed &= validate_regulation_mappings()
|
|
all_passed &= validate_data_categories()
|
|
|
|
print_statistics()
|
|
|
|
print(f"\n{GREEN}{'='*60}{RESET}")
|
|
if all_passed:
|
|
print(f"{GREEN}✓ All validations passed!{RESET}")
|
|
return 0
|
|
else:
|
|
print(f"{YELLOW}⚠ Some validations failed or have warnings{RESET}")
|
|
return 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|