Some checks failed
Tests / Go Tests (push) Has been cancelled
Tests / Python Tests (push) Has been cancelled
Tests / Integration Tests (push) Has been cancelled
Tests / Go Lint (push) Has been cancelled
Tests / Python Lint (push) Has been cancelled
Tests / Security Scan (push) Has been cancelled
Tests / All Checks Passed (push) Has been cancelled
Security Scanning / Secret Scanning (push) Has been cancelled
Security Scanning / Dependency Vulnerability Scan (push) Has been cancelled
Security Scanning / Go Security Scan (push) Has been cancelled
Security Scanning / Python Security Scan (push) Has been cancelled
Security Scanning / Node.js Security Scan (push) Has been cancelled
Security Scanning / Docker Image Security (push) Has been cancelled
Security Scanning / Security Summary (push) Has been cancelled
CI/CD Pipeline / Go Tests (push) Has been cancelled
CI/CD Pipeline / Python Tests (push) Has been cancelled
CI/CD Pipeline / Website Tests (push) Has been cancelled
CI/CD Pipeline / Linting (push) Has been cancelled
CI/CD Pipeline / Security Scan (push) Has been cancelled
CI/CD Pipeline / Docker Build & Push (push) Has been cancelled
CI/CD Pipeline / Integration Tests (push) Has been cancelled
CI/CD Pipeline / Deploy to Staging (push) Has been cancelled
CI/CD Pipeline / Deploy to Production (push) Has been cancelled
CI/CD Pipeline / CI Summary (push) Has been cancelled
ci/woodpecker/manual/build-ci-image Pipeline was successful
ci/woodpecker/manual/main Pipeline failed
All services: admin-v2, studio-v2, website, ai-compliance-sdk, consent-service, klausur-service, voice-service, and infrastructure. Large PDFs and compiled binaries excluded via .gitignore.
208 lines
6.5 KiB
Python
208 lines
6.5 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Validate service modules configuration.
|
|
|
|
Checks:
|
|
- All services have required fields
|
|
- Port conflicts
|
|
- Technology stack completeness
|
|
- Regulation mappings validity
|
|
"""
|
|
|
|
import sys
|
|
from pathlib import Path
|
|
from collections import defaultdict
|
|
from typing import Dict, List, Set
|
|
|
|
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
|
|
|
|
from compliance.data.service_modules import (
|
|
BREAKPILOT_SERVICES,
|
|
get_service_count,
|
|
get_services_by_type,
|
|
get_services_processing_pii,
|
|
get_services_with_ai,
|
|
get_critical_services
|
|
)
|
|
from compliance.data.regulations import REGULATIONS_SEED
|
|
|
|
# Color codes for output
|
|
GREEN = '\033[92m'
|
|
YELLOW = '\033[93m'
|
|
RED = '\033[91m'
|
|
RESET = '\033[0m'
|
|
|
|
|
|
def validate_required_fields():
|
|
"""Validate that all services have required fields."""
|
|
print(f"\n{GREEN}=== Validating Required Fields ==={RESET}")
|
|
errors = []
|
|
|
|
required_fields = ["name", "display_name", "service_type", "technology_stack"]
|
|
|
|
for service in BREAKPILOT_SERVICES:
|
|
for field in required_fields:
|
|
if field not in service or not service[field]:
|
|
errors.append(f" {RED}✗{RESET} Service '{service.get('name', 'UNKNOWN')}' missing required field: {field}")
|
|
|
|
if errors:
|
|
print(f"\n{RED}Found {len(errors)} errors:{RESET}")
|
|
for error in errors:
|
|
print(error)
|
|
return False
|
|
else:
|
|
print(f"{GREEN}✓ All services have required fields{RESET}")
|
|
return True
|
|
|
|
|
|
def validate_port_conflicts():
|
|
"""Check for port conflicts."""
|
|
print(f"\n{GREEN}=== Checking Port Conflicts ==={RESET}")
|
|
port_map: Dict[int, List[str]] = defaultdict(list)
|
|
warnings = []
|
|
|
|
for service in BREAKPILOT_SERVICES:
|
|
port = service.get("port")
|
|
if port:
|
|
port_map[port].append(service["name"])
|
|
|
|
for port, services in port_map.items():
|
|
if len(services) > 1:
|
|
warnings.append(f" {YELLOW}⚠{RESET} Port {port} used by multiple services: {', '.join(services)}")
|
|
|
|
if warnings:
|
|
print(f"\n{YELLOW}Found {len(warnings)} port conflicts:{RESET}")
|
|
for warning in warnings:
|
|
print(warning)
|
|
return False
|
|
else:
|
|
print(f"{GREEN}✓ No port conflicts detected{RESET}")
|
|
return True
|
|
|
|
|
|
def validate_regulation_mappings():
|
|
"""Validate that all regulation codes exist."""
|
|
print(f"\n{GREEN}=== Validating Regulation Mappings ==={RESET}")
|
|
valid_regulation_codes = {reg["code"] for reg in REGULATIONS_SEED}
|
|
errors = []
|
|
|
|
for service in BREAKPILOT_SERVICES:
|
|
regulations = service.get("regulations", [])
|
|
for reg_mapping in regulations:
|
|
reg_code = reg_mapping.get("code")
|
|
if reg_code not in valid_regulation_codes:
|
|
errors.append(
|
|
f" {RED}✗{RESET} Service '{service['name']}' references unknown regulation: {reg_code}"
|
|
)
|
|
|
|
# Check for relevance field
|
|
if "relevance" not in reg_mapping:
|
|
errors.append(
|
|
f" {RED}✗{RESET} Service '{service['name']}' regulation mapping for {reg_code} missing 'relevance'"
|
|
)
|
|
|
|
if errors:
|
|
print(f"\n{RED}Found {len(errors)} regulation mapping errors:{RESET}")
|
|
for error in errors:
|
|
print(error)
|
|
return False
|
|
else:
|
|
print(f"{GREEN}✓ All regulation mappings are valid{RESET}")
|
|
return True
|
|
|
|
|
|
def print_statistics():
|
|
"""Print statistics about the service registry."""
|
|
print(f"\n{GREEN}=== Service Module Statistics ==={RESET}")
|
|
|
|
total = get_service_count()
|
|
print(f"\nTotal Services: {total}")
|
|
|
|
# By type
|
|
print(f"\n{GREEN}By Type:{RESET}")
|
|
service_types = ["backend", "database", "ai", "communication", "storage", "infrastructure", "monitoring", "security"]
|
|
for stype in service_types:
|
|
count = len(get_services_by_type(stype))
|
|
if count > 0:
|
|
print(f" - {stype.capitalize()}: {count}")
|
|
|
|
# PII processing
|
|
pii_count = len(get_services_processing_pii())
|
|
print(f"\n{GREEN}Data Processing:{RESET}")
|
|
print(f" - Processing PII: {pii_count}")
|
|
print(f" - AI Components: {len(get_services_with_ai())}")
|
|
|
|
# Criticality
|
|
critical_count = len(get_critical_services())
|
|
print(f"\n{GREEN}Criticality:{RESET}")
|
|
print(f" - Critical Services: {critical_count}")
|
|
|
|
# Ports
|
|
services_with_ports = [s for s in BREAKPILOT_SERVICES if s.get("port")]
|
|
print(f"\n{GREEN}Network:{RESET}")
|
|
print(f" - Services with exposed ports: {len(services_with_ports)}")
|
|
|
|
# Docker images
|
|
docker_services = [s for s in BREAKPILOT_SERVICES if s.get("docker_image")]
|
|
print(f"\n{GREEN}Docker:{RESET}")
|
|
print(f" - Services with Docker images: {len(docker_services)}")
|
|
|
|
# Most common technologies
|
|
tech_count: Dict[str, int] = defaultdict(int)
|
|
for service in BREAKPILOT_SERVICES:
|
|
for tech in service.get("technology_stack", []):
|
|
tech_count[tech] += 1
|
|
|
|
print(f"\n{GREEN}Top Technologies:{RESET}")
|
|
for tech, count in sorted(tech_count.items(), key=lambda x: x[1], reverse=True)[:10]:
|
|
print(f" - {tech}: {count}")
|
|
|
|
|
|
def validate_data_categories():
|
|
"""Check that PII-processing services have data_categories defined."""
|
|
print(f"\n{GREEN}=== Validating Data Categories ==={RESET}")
|
|
warnings = []
|
|
|
|
for service in BREAKPILOT_SERVICES:
|
|
if service.get("processes_pii") and not service.get("data_categories"):
|
|
warnings.append(
|
|
f" {YELLOW}⚠{RESET} Service '{service['name']}' processes PII but has no data_categories defined"
|
|
)
|
|
|
|
if warnings:
|
|
print(f"\n{YELLOW}Found {len(warnings)} warnings:{RESET}")
|
|
for warning in warnings:
|
|
print(warning)
|
|
return False
|
|
else:
|
|
print(f"{GREEN}✓ All PII-processing services have data categories{RESET}")
|
|
return True
|
|
|
|
|
|
def main():
|
|
"""Run all validations."""
|
|
print(f"{GREEN}{'='*60}")
|
|
print(f" Breakpilot Service Module Validation")
|
|
print(f"{'='*60}{RESET}")
|
|
|
|
all_passed = True
|
|
|
|
all_passed &= validate_required_fields()
|
|
all_passed &= validate_port_conflicts()
|
|
all_passed &= validate_regulation_mappings()
|
|
all_passed &= validate_data_categories()
|
|
|
|
print_statistics()
|
|
|
|
print(f"\n{GREEN}{'='*60}{RESET}")
|
|
if all_passed:
|
|
print(f"{GREEN}✓ All validations passed!{RESET}")
|
|
return 0
|
|
else:
|
|
print(f"{YELLOW}⚠ Some validations failed or have warnings{RESET}")
|
|
return 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|