# [DEF:backend.src.services.clean_release.policy_engine:Module] # @TIER: CRITICAL # @SEMANTICS: clean-release, policy, classification, source-isolation # @PURPOSE: Evaluate artifact/source policies for enterprise clean profile with deterministic outcomes. # @LAYER: Domain # @RELATION: DEPENDS_ON -> backend.src.models.clean_release.CleanProfilePolicy # @RELATION: DEPENDS_ON -> backend.src.models.clean_release.ResourceSourceRegistry # @INVARIANT: Enterprise-clean policy always treats non-registry sources as violations. from __future__ import annotations from dataclasses import dataclass from typing import Dict, Iterable, List, Tuple from ...core.logger import belief_scope, logger from ...models.clean_release import CleanProfilePolicy, ResourceSourceRegistry @dataclass class PolicyValidationResult: ok: bool blocking_reasons: List[str] @dataclass class SourceValidationResult: ok: bool violation: Dict | None # [DEF:CleanPolicyEngine:Class] # @PRE: Active policy exists and is internally consistent. # @POST: Deterministic classification and source validation are available. # @TEST_CONTRACT: CandidateEvaluationInput -> PolicyValidationResult|SourceValidationResult # @TEST_SCENARIO: policy_valid -> Enterprise clean policy with matching registry returns ok=True # @TEST_FIXTURE: policy_enterprise_clean -> file:backend/tests/fixtures/clean_release/fixtures_clean_release.json # @TEST_EDGE: missing_registry_ref -> policy has empty internal_source_registry_ref # @TEST_EDGE: conflicting_registry -> policy registry ref does not match registry id # @TEST_EDGE: external_endpoint -> endpoint not present in enabled internal registry entries # @TEST_INVARIANT: deterministic_classification -> VERIFIED_BY: [policy_valid] class CleanPolicyEngine: def __init__(self, policy: CleanProfilePolicy, registry: ResourceSourceRegistry): self.policy = policy self.registry = registry def validate_policy(self) -> PolicyValidationResult: with belief_scope("clean_policy_engine.validate_policy"): logger.reason("Validating enterprise-clean policy and internal registry consistency") reasons: List[str] = [] if not self.policy.active: reasons.append("Policy must be active") if not self.policy.internal_source_registry_ref.strip(): reasons.append("Policy missing internal_source_registry_ref") if self.policy.profile.value == "enterprise-clean" and not self.policy.prohibited_artifact_categories: reasons.append("Enterprise policy requires prohibited artifact categories") if self.policy.profile.value == "enterprise-clean" and not self.policy.external_source_forbidden: reasons.append("Enterprise policy requires external_source_forbidden=true") if self.registry.registry_id != self.policy.internal_source_registry_ref: reasons.append("Policy registry ref does not match provided registry") if not self.registry.entries: reasons.append("Registry must contain entries") logger.reflect(f"Policy validation completed. blocking_reasons={len(reasons)}") return PolicyValidationResult(ok=len(reasons) == 0, blocking_reasons=reasons) def classify_artifact(self, artifact: Dict) -> str: category = (artifact.get("category") or "").strip() if category in self.policy.required_system_categories: logger.reason(f"Artifact category '{category}' classified as required-system") return "required-system" if category in self.policy.prohibited_artifact_categories: logger.reason(f"Artifact category '{category}' classified as excluded-prohibited") return "excluded-prohibited" logger.reflect(f"Artifact category '{category}' classified as allowed") return "allowed" def validate_resource_source(self, endpoint: str) -> SourceValidationResult: with belief_scope("clean_policy_engine.validate_resource_source"): if not endpoint: logger.explore("Empty endpoint detected; treating as blocking external-source violation") return SourceValidationResult( ok=False, violation={ "category": "external-source", "location": "", "remediation": "Replace with approved internal server", "blocked_release": True, }, ) allowed_hosts = {entry.host for entry in self.registry.entries if entry.enabled} normalized = endpoint.strip().lower() if normalized in allowed_hosts: logger.reason(f"Endpoint '{normalized}' is present in internal allowlist") return SourceValidationResult(ok=True, violation=None) logger.explore(f"Endpoint '{endpoint}' is outside internal allowlist") return SourceValidationResult( ok=False, violation={ "category": "external-source", "location": endpoint, "remediation": "Replace with approved internal server", "blocked_release": True, }, ) def evaluate_candidate(self, artifacts: Iterable[Dict], sources: Iterable[str]) -> Tuple[List[Dict], List[Dict]]: with belief_scope("clean_policy_engine.evaluate_candidate"): logger.reason("Evaluating candidate artifacts and resource sources against enterprise policy") classified: List[Dict] = [] violations: List[Dict] = [] for artifact in artifacts: classification = self.classify_artifact(artifact) enriched = dict(artifact) enriched["classification"] = classification if classification == "excluded-prohibited": violations.append( { "category": "data-purity", "location": artifact.get("path", ""), "remediation": "Remove prohibited content", "blocked_release": True, } ) classified.append(enriched) for source in sources: source_result = self.validate_resource_source(source) if not source_result.ok and source_result.violation: violations.append(source_result.violation) logger.reflect( f"Candidate evaluation finished. artifacts={len(classified)} violations={len(violations)}" ) return classified, violations # [/DEF:CleanPolicyEngine:Class] # [/DEF:backend.src.services.clean_release.policy_engine:Module]