# [DEF:backend.src.services.reports.normalizer:Module] # @TIER: CRITICAL # @SEMANTICS: reports, normalization, tasks, fallback # @PURPOSE: Convert task manager task objects into canonical unified TaskReport entities with deterministic fallback behavior. # @LAYER: Domain # @RELATION: DEPENDS_ON -> backend.src.core.task_manager.models.Task # @RELATION: DEPENDS_ON -> backend.src.models.report # @RELATION: DEPENDS_ON -> backend.src.services.reports.type_profiles # @INVARIANT: Unknown task types and partial payloads remain visible via fallback mapping. # [SECTION: IMPORTS] from datetime import datetime from typing import Any, Dict, Optional from ...core.logger import belief_scope from ...core.task_manager.models import Task, TaskStatus from ...models.report import ErrorContext, ReportStatus, TaskReport from .type_profiles import get_type_profile, resolve_task_type # [/SECTION] # [DEF:status_to_report_status:Function] # @PURPOSE: Normalize internal task status to canonical report status. # @PRE: status may be known or unknown string/enum value. # @POST: Always returns one of canonical ReportStatus values. # @PARAM: status (Any) - Internal task status value. # @RETURN: ReportStatus - Canonical report status. def status_to_report_status(status: Any) -> ReportStatus: with belief_scope("status_to_report_status"): raw = str(status.value if isinstance(status, TaskStatus) else status).upper() if raw == TaskStatus.SUCCESS.value: return ReportStatus.SUCCESS if raw == TaskStatus.FAILED.value: return ReportStatus.FAILED if raw in {TaskStatus.PENDING.value, TaskStatus.RUNNING.value, TaskStatus.AWAITING_INPUT.value, TaskStatus.AWAITING_MAPPING.value}: return ReportStatus.IN_PROGRESS return ReportStatus.PARTIAL # [/DEF:status_to_report_status:Function] # [DEF:build_summary:Function] # @PURPOSE: Build deterministic user-facing summary from task payload and status. # @PRE: report_status is canonical; plugin_id may be unknown. # @POST: Returns non-empty summary text. # @PARAM: task (Task) - Source task object. # @PARAM: report_status (ReportStatus) - Canonical status. # @RETURN: str - Normalized summary. def build_summary(task: Task, report_status: ReportStatus) -> str: with belief_scope("build_summary"): result = task.result if isinstance(result, dict): for key in ("summary", "message", "status_message", "description"): value = result.get(key) if isinstance(value, str) and value.strip(): return value.strip() if report_status == ReportStatus.SUCCESS: return "Task completed successfully" if report_status == ReportStatus.FAILED: return "Task failed" if report_status == ReportStatus.IN_PROGRESS: return "Task is in progress" return "Task completed with partial data" # [/DEF:build_summary:Function] # [DEF:extract_error_context:Function] # @PURPOSE: Extract normalized error context and next actions for failed/partial reports. # @PRE: task is a valid Task object. # @POST: Returns ErrorContext for failed/partial when context exists; otherwise None. # @PARAM: task (Task) - Source task. # @PARAM: report_status (ReportStatus) - Canonical status. # @RETURN: Optional[ErrorContext] - Error context block. def extract_error_context(task: Task, report_status: ReportStatus) -> Optional[ErrorContext]: with belief_scope("extract_error_context"): if report_status not in {ReportStatus.FAILED, ReportStatus.PARTIAL}: return None result = task.result if isinstance(task.result, dict) else {} message = None code = None next_actions = [] if isinstance(result.get("error"), dict): error_obj = result.get("error", {}) message = error_obj.get("message") or message code = error_obj.get("code") or code actions = error_obj.get("next_actions") if isinstance(actions, list): next_actions = [str(action) for action in actions if str(action).strip()] if not message: message = result.get("error_message") if isinstance(result.get("error_message"), str) else None if not message: for log in reversed(task.logs): if str(log.level).upper() == "ERROR" and log.message: message = log.message break if not message: message = "Not provided" if not next_actions: next_actions = ["Review task diagnostics", "Retry the operation"] return ErrorContext(code=code, message=message, next_actions=next_actions) # [/DEF:extract_error_context:Function] # [DEF:normalize_task_report:Function] # @PURPOSE: Convert one Task to canonical TaskReport envelope. # @PRE: task has valid id and plugin_id fields. # @POST: Returns TaskReport with required fields and deterministic fallback behavior. # @PARAM: task (Task) - Source task. # @RETURN: TaskReport - Canonical normalized report. def normalize_task_report(task: Task) -> TaskReport: with belief_scope("normalize_task_report"): task_type = resolve_task_type(task.plugin_id) report_status = status_to_report_status(task.status) profile = get_type_profile(task_type) started_at = task.started_at if isinstance(task.started_at, datetime) else None updated_at = task.finished_at if isinstance(task.finished_at, datetime) else None if not updated_at: updated_at = started_at or datetime.utcnow() details: Dict[str, Any] = { "profile": { "display_label": profile.get("display_label"), "visual_variant": profile.get("visual_variant"), "icon_token": profile.get("icon_token"), "emphasis_rules": profile.get("emphasis_rules", []), }, "result": task.result if task.result is not None else {"note": "Not provided"}, } source_ref: Dict[str, Any] = {} if isinstance(task.params, dict): for key in ("environment_id", "source_env_id", "target_env_id", "dashboard_id", "dataset_id", "resource_id"): if key in task.params: source_ref[key] = task.params.get(key) return TaskReport( report_id=task.id, task_id=task.id, task_type=task_type, status=report_status, started_at=started_at, updated_at=updated_at, summary=build_summary(task, report_status), details=details, error_context=extract_error_context(task, report_status), source_ref=source_ref or None, ) # [/DEF:normalize_task_report:Function] # [/DEF:backend.src.services.reports.normalizer:Module]