Files
ss-tools/backend/src/services/clean_release/policy_engine.py
busya bb921ce5dd [
{
        "file": "frontend/src/components/__tests__/task_log_viewer.test.js",
        "verdict": "APPROVED",
        "rejection_reason": "NONE",
        "audit_details": {
            "target_invoked": true,
            "pre_conditions_tested": true,
            "post_conditions_tested": true,
            "test_fixture_used": true,
            "edges_covered": true,
            "invariants_verified": true,
            "ux_states_tested": true,
            "semantic_anchors_present": true
        },
        "coverage_summary": {
            "total_edges": 2,
            "edges_tested": 2,
            "total_invariants": 1,
            "invariants_tested": 1,
            "total_ux_states": 3,
            "ux_states_tested": 3
        },
        "tier_compliance": {
            "source_tier": "CRITICAL",
            "meets_tier_requirements": true
        },
        "feedback": "Remediation successful: test tier matches CRITICAL, missing missing @TEST_EDGE no_task_id coverage added, test for @UX_FEEDBACK (autoScroll) added properly, missing inline=false (show=true) tested properly. Semantic RELATION tag fixed to VERIFIES."
    },
    {
        "file": "frontend/src/lib/components/reports/__tests__/report_card.ux.test.js",
        "verdict": "APPROVED",
        "rejection_reason": "NONE",
        "audit_details": {
            "target_invoked": true,
            "pre_conditions_tested": true,
            "post_conditions_tested": true,
            "test_fixture_used": true,
            "edges_covered": true,
            "invariants_verified": true,
            "ux_states_tested": true,
            "semantic_anchors_present": true
        },
        "coverage_summary": {
            "total_edges": 2,
            "edges_tested": 2,
            "total_invariants": 1,
            "invariants_tested": 1,
            "total_ux_states": 2,
            "ux_states_tested": 2
        },
        "tier_compliance": {
            "source_tier": "CRITICAL",
            "meets_tier_requirements": true
        },
        "feedback": "Remediation successful: @TEST_EDGE random_status and @TEST_EDGE empty_report_object tests explicitly assert on outcomes, @TEST_FIXTURE tested completely, Test tier switched to CRITICAL."
    },
    {
        "file": "backend/tests/test_logger.py",
        "verdict": "APPROVED",
        "rejection_reason": "NONE",
        "audit_details": {
            "target_invoked": true,
            "pre_conditions_tested": true,
            "post_conditions_tested": true,
            "test_fixture_used": true,
            "edges_covered": true,
            "invariants_verified": true,
            "ux_states_tested": false,
            "semantic_anchors_present": true
        },
        "coverage_summary": {
            "total_edges": 0,
            "edges_tested": 0,
            "total_invariants": 0,
            "invariants_tested": 0,
            "total_ux_states": 0,
            "ux_states_tested": 0
        },
        "tier_compliance": {
            "source_tier": "STANDARD",
            "meets_tier_requirements": true
        },
        "feedback": "Remediation successful: Test module semantic anchors added [DEF] and [/DEF] explicitly. Added missing @TIER tag and @RELATION: VERIFIES -> src/core/logger.py at the top of the file."
    }
]
2026-03-03 21:05:29 +03:00

141 lines
6.8 KiB
Python

# [DEF:backend.src.services.clean_release.policy_engine:Module]
# @TIER: CRITICAL
# @SEMANTICS: clean-release, policy, classification, source-isolation
# @PURPOSE: Evaluate artifact/source policies for enterprise clean profile with deterministic outcomes.
# @LAYER: Domain
# @RELATION: DEPENDS_ON -> backend.src.models.clean_release.CleanProfilePolicy
# @RELATION: DEPENDS_ON -> backend.src.models.clean_release.ResourceSourceRegistry
# @INVARIANT: Enterprise-clean policy always treats non-registry sources as violations.
from __future__ import annotations
from dataclasses import dataclass
from typing import Dict, Iterable, List, Tuple
from ...core.logger import belief_scope, logger
from ...models.clean_release import CleanProfilePolicy, ResourceSourceRegistry
@dataclass
class PolicyValidationResult:
ok: bool
blocking_reasons: List[str]
@dataclass
class SourceValidationResult:
ok: bool
violation: Dict | None
# [DEF:CleanPolicyEngine:Class]
# @PRE: Active policy exists and is internally consistent.
# @POST: Deterministic classification and source validation are available.
# @TEST_CONTRACT: CandidateEvaluationInput -> PolicyValidationResult|SourceValidationResult
# @TEST_SCENARIO: policy_valid -> Enterprise clean policy with matching registry returns ok=True
# @TEST_FIXTURE: policy_enterprise_clean -> file:backend/tests/fixtures/clean_release/fixtures_clean_release.json
# @TEST_EDGE: missing_registry_ref -> policy has empty internal_source_registry_ref
# @TEST_EDGE: conflicting_registry -> policy registry ref does not match registry id
# @TEST_EDGE: external_endpoint -> endpoint not present in enabled internal registry entries
# @TEST_INVARIANT: deterministic_classification -> VERIFIED_BY: [policy_valid]
class CleanPolicyEngine:
def __init__(self, policy: CleanProfilePolicy, registry: ResourceSourceRegistry):
self.policy = policy
self.registry = registry
def validate_policy(self) -> PolicyValidationResult:
with belief_scope("clean_policy_engine.validate_policy"):
logger.reason("Validating enterprise-clean policy and internal registry consistency")
reasons: List[str] = []
if not self.policy.active:
reasons.append("Policy must be active")
if not self.policy.internal_source_registry_ref.strip():
reasons.append("Policy missing internal_source_registry_ref")
if self.policy.profile.value == "enterprise-clean" and not self.policy.prohibited_artifact_categories:
reasons.append("Enterprise policy requires prohibited artifact categories")
if self.policy.profile.value == "enterprise-clean" and not self.policy.external_source_forbidden:
reasons.append("Enterprise policy requires external_source_forbidden=true")
if self.registry.registry_id != self.policy.internal_source_registry_ref:
reasons.append("Policy registry ref does not match provided registry")
if not self.registry.entries:
reasons.append("Registry must contain entries")
logger.reflect(f"Policy validation completed. blocking_reasons={len(reasons)}")
return PolicyValidationResult(ok=len(reasons) == 0, blocking_reasons=reasons)
def classify_artifact(self, artifact: Dict) -> str:
category = (artifact.get("category") or "").strip()
if category in self.policy.required_system_categories:
logger.reason(f"Artifact category '{category}' classified as required-system")
return "required-system"
if category in self.policy.prohibited_artifact_categories:
logger.reason(f"Artifact category '{category}' classified as excluded-prohibited")
return "excluded-prohibited"
logger.reflect(f"Artifact category '{category}' classified as allowed")
return "allowed"
def validate_resource_source(self, endpoint: str) -> SourceValidationResult:
with belief_scope("clean_policy_engine.validate_resource_source"):
if not endpoint:
logger.explore("Empty endpoint detected; treating as blocking external-source violation")
return SourceValidationResult(
ok=False,
violation={
"category": "external-source",
"location": "<empty-endpoint>",
"remediation": "Replace with approved internal server",
"blocked_release": True,
},
)
allowed_hosts = {entry.host for entry in self.registry.entries if entry.enabled}
normalized = endpoint.strip().lower()
if normalized in allowed_hosts:
logger.reason(f"Endpoint '{normalized}' is present in internal allowlist")
return SourceValidationResult(ok=True, violation=None)
logger.explore(f"Endpoint '{endpoint}' is outside internal allowlist")
return SourceValidationResult(
ok=False,
violation={
"category": "external-source",
"location": endpoint,
"remediation": "Replace with approved internal server",
"blocked_release": True,
},
)
def evaluate_candidate(self, artifacts: Iterable[Dict], sources: Iterable[str]) -> Tuple[List[Dict], List[Dict]]:
with belief_scope("clean_policy_engine.evaluate_candidate"):
logger.reason("Evaluating candidate artifacts and resource sources against enterprise policy")
classified: List[Dict] = []
violations: List[Dict] = []
for artifact in artifacts:
classification = self.classify_artifact(artifact)
enriched = dict(artifact)
enriched["classification"] = classification
if classification == "excluded-prohibited":
violations.append(
{
"category": "data-purity",
"location": artifact.get("path", "<unknown-path>"),
"remediation": "Remove prohibited content",
"blocked_release": True,
}
)
classified.append(enriched)
for source in sources:
source_result = self.validate_resource_source(source)
if not source_result.ok and source_result.violation:
violations.append(source_result.violation)
logger.reflect(
f"Candidate evaluation finished. artifacts={len(classified)} violations={len(violations)}"
)
return classified, violations
# [/DEF:CleanPolicyEngine:Class]
# [/DEF:backend.src.services.clean_release.policy_engine:Module]