semantic update

This commit is contained in:
2026-02-24 21:08:12 +03:00
parent 7a12ed0931
commit 95ae9c6af1
32 changed files with 60376 additions and 59911 deletions

View File

@@ -33,7 +33,8 @@ class EncryptionManager:
# @PRE: data must be a non-empty string.
# @POST: Returns encrypted string.
def encrypt(self, data: str) -> str:
return self.fernet.encrypt(data.encode()).decode()
with belief_scope("encrypt"):
return self.fernet.encrypt(data.encode()).decode()
# [/DEF:EncryptionManager.encrypt:Function]
# [DEF:EncryptionManager.decrypt:Function]
@@ -41,7 +42,8 @@ class EncryptionManager:
# @PRE: encrypted_data must be a valid Fernet-encrypted string.
# @POST: Returns original plaintext string.
def decrypt(self, encrypted_data: str) -> str:
return self.fernet.decrypt(encrypted_data.encode()).decode()
with belief_scope("decrypt"):
return self.fernet.decrypt(encrypted_data.encode()).decode()
# [/DEF:EncryptionManager.decrypt:Function]
# [/DEF:EncryptionManager:Class]

View File

@@ -12,6 +12,7 @@
from datetime import datetime
from typing import Any, Dict, Optional
from ...core.logger import belief_scope
from ...core.task_manager.models import Task, TaskStatus
from ...models.report import ErrorContext, ReportStatus, TaskReport
from .type_profiles import get_type_profile, resolve_task_type
@@ -25,14 +26,15 @@ from .type_profiles import get_type_profile, resolve_task_type
# @PARAM: status (Any) - Internal task status value.
# @RETURN: ReportStatus - Canonical report status.
def status_to_report_status(status: Any) -> ReportStatus:
raw = str(status.value if isinstance(status, TaskStatus) else status).upper()
if raw == TaskStatus.SUCCESS.value:
return ReportStatus.SUCCESS
if raw == TaskStatus.FAILED.value:
return ReportStatus.FAILED
if raw in {TaskStatus.PENDING.value, TaskStatus.RUNNING.value, TaskStatus.AWAITING_INPUT.value, TaskStatus.AWAITING_MAPPING.value}:
return ReportStatus.IN_PROGRESS
return ReportStatus.PARTIAL
with belief_scope("status_to_report_status"):
raw = str(status.value if isinstance(status, TaskStatus) else status).upper()
if raw == TaskStatus.SUCCESS.value:
return ReportStatus.SUCCESS
if raw == TaskStatus.FAILED.value:
return ReportStatus.FAILED
if raw in {TaskStatus.PENDING.value, TaskStatus.RUNNING.value, TaskStatus.AWAITING_INPUT.value, TaskStatus.AWAITING_MAPPING.value}:
return ReportStatus.IN_PROGRESS
return ReportStatus.PARTIAL
# [/DEF:status_to_report_status:Function]
@@ -44,19 +46,20 @@ def status_to_report_status(status: Any) -> ReportStatus:
# @PARAM: report_status (ReportStatus) - Canonical status.
# @RETURN: str - Normalized summary.
def build_summary(task: Task, report_status: ReportStatus) -> str:
result = task.result
if isinstance(result, dict):
for key in ("summary", "message", "status_message", "description"):
value = result.get(key)
if isinstance(value, str) and value.strip():
return value.strip()
if report_status == ReportStatus.SUCCESS:
return "Task completed successfully"
if report_status == ReportStatus.FAILED:
return "Task failed"
if report_status == ReportStatus.IN_PROGRESS:
return "Task is in progress"
return "Task completed with partial data"
with belief_scope("build_summary"):
result = task.result
if isinstance(result, dict):
for key in ("summary", "message", "status_message", "description"):
value = result.get(key)
if isinstance(value, str) and value.strip():
return value.strip()
if report_status == ReportStatus.SUCCESS:
return "Task completed successfully"
if report_status == ReportStatus.FAILED:
return "Task failed"
if report_status == ReportStatus.IN_PROGRESS:
return "Task is in progress"
return "Task completed with partial data"
# [/DEF:build_summary:Function]
@@ -68,38 +71,39 @@ def build_summary(task: Task, report_status: ReportStatus) -> str:
# @PARAM: report_status (ReportStatus) - Canonical status.
# @RETURN: Optional[ErrorContext] - Error context block.
def extract_error_context(task: Task, report_status: ReportStatus) -> Optional[ErrorContext]:
if report_status not in {ReportStatus.FAILED, ReportStatus.PARTIAL}:
return None
with belief_scope("extract_error_context"):
if report_status not in {ReportStatus.FAILED, ReportStatus.PARTIAL}:
return None
result = task.result if isinstance(task.result, dict) else {}
message = None
code = None
next_actions = []
result = task.result if isinstance(task.result, dict) else {}
message = None
code = None
next_actions = []
if isinstance(result.get("error"), dict):
error_obj = result.get("error", {})
message = error_obj.get("message") or message
code = error_obj.get("code") or code
actions = error_obj.get("next_actions")
if isinstance(actions, list):
next_actions = [str(action) for action in actions if str(action).strip()]
if isinstance(result.get("error"), dict):
error_obj = result.get("error", {})
message = error_obj.get("message") or message
code = error_obj.get("code") or code
actions = error_obj.get("next_actions")
if isinstance(actions, list):
next_actions = [str(action) for action in actions if str(action).strip()]
if not message:
message = result.get("error_message") if isinstance(result.get("error_message"), str) else None
if not message:
message = result.get("error_message") if isinstance(result.get("error_message"), str) else None
if not message:
for log in reversed(task.logs):
if str(log.level).upper() == "ERROR" and log.message:
message = log.message
break
if not message:
for log in reversed(task.logs):
if str(log.level).upper() == "ERROR" and log.message:
message = log.message
break
if not message:
message = "Not provided"
if not message:
message = "Not provided"
if not next_actions:
next_actions = ["Review task diagnostics", "Retry the operation"]
if not next_actions:
next_actions = ["Review task diagnostics", "Retry the operation"]
return ErrorContext(code=code, message=message, next_actions=next_actions)
return ErrorContext(code=code, message=message, next_actions=next_actions)
# [/DEF:extract_error_context:Function]
@@ -110,43 +114,44 @@ def extract_error_context(task: Task, report_status: ReportStatus) -> Optional[E
# @PARAM: task (Task) - Source task.
# @RETURN: TaskReport - Canonical normalized report.
def normalize_task_report(task: Task) -> TaskReport:
task_type = resolve_task_type(task.plugin_id)
report_status = status_to_report_status(task.status)
profile = get_type_profile(task_type)
with belief_scope("normalize_task_report"):
task_type = resolve_task_type(task.plugin_id)
report_status = status_to_report_status(task.status)
profile = get_type_profile(task_type)
started_at = task.started_at if isinstance(task.started_at, datetime) else None
updated_at = task.finished_at if isinstance(task.finished_at, datetime) else None
if not updated_at:
updated_at = started_at or datetime.utcnow()
started_at = task.started_at if isinstance(task.started_at, datetime) else None
updated_at = task.finished_at if isinstance(task.finished_at, datetime) else None
if not updated_at:
updated_at = started_at or datetime.utcnow()
details: Dict[str, Any] = {
"profile": {
"display_label": profile.get("display_label"),
"visual_variant": profile.get("visual_variant"),
"icon_token": profile.get("icon_token"),
"emphasis_rules": profile.get("emphasis_rules", []),
},
"result": task.result if task.result is not None else {"note": "Not provided"},
}
details: Dict[str, Any] = {
"profile": {
"display_label": profile.get("display_label"),
"visual_variant": profile.get("visual_variant"),
"icon_token": profile.get("icon_token"),
"emphasis_rules": profile.get("emphasis_rules", []),
},
"result": task.result if task.result is not None else {"note": "Not provided"},
}
source_ref: Dict[str, Any] = {}
if isinstance(task.params, dict):
for key in ("environment_id", "source_env_id", "target_env_id", "dashboard_id", "dataset_id", "resource_id"):
if key in task.params:
source_ref[key] = task.params.get(key)
source_ref: Dict[str, Any] = {}
if isinstance(task.params, dict):
for key in ("environment_id", "source_env_id", "target_env_id", "dashboard_id", "dataset_id", "resource_id"):
if key in task.params:
source_ref[key] = task.params.get(key)
return TaskReport(
report_id=task.id,
task_id=task.id,
task_type=task_type,
status=report_status,
started_at=started_at,
updated_at=updated_at,
summary=build_summary(task, report_status),
details=details,
error_context=extract_error_context(task, report_status),
source_ref=source_ref or None,
)
return TaskReport(
report_id=task.id,
task_id=task.id,
task_type=task_type,
status=report_status,
started_at=started_at,
updated_at=updated_at,
summary=build_summary(task, report_status),
details=details,
error_context=extract_error_context(task, report_status),
source_ref=source_ref or None,
)
# [/DEF:normalize_task_report:Function]
# [/DEF:backend.src.services.reports.normalizer:Module]

View File

@@ -12,6 +12,8 @@
from datetime import datetime, timezone
from typing import List, Optional
from ...core.logger import belief_scope
from ...core.task_manager import TaskManager
from ...models.report import ReportCollection, ReportDetailView, ReportQuery, ReportStatus, TaskReport, TaskType
from .normalizer import normalize_task_report
@@ -33,7 +35,8 @@ class ReportsService:
# @INVARIANT: Constructor performs no task mutations.
# @PARAM: task_manager (TaskManager) - Task manager providing source task history.
def __init__(self, task_manager: TaskManager):
self.task_manager = task_manager
with belief_scope("__init__"):
self.task_manager = task_manager
# [/DEF:__init__:Function]
# [DEF:_load_normalized_reports:Function]
@@ -43,9 +46,10 @@ class ReportsService:
# @INVARIANT: Every returned item is a TaskReport.
# @RETURN: List[TaskReport] - Reports sorted later by list logic.
def _load_normalized_reports(self) -> List[TaskReport]:
tasks = self.task_manager.get_all_tasks()
reports = [normalize_task_report(task) for task in tasks]
return reports
with belief_scope("_load_normalized_reports"):
tasks = self.task_manager.get_all_tasks()
reports = [normalize_task_report(task) for task in tasks]
return reports
# [/DEF:_load_normalized_reports:Function]
# [DEF:_to_utc_datetime:Function]
@@ -56,11 +60,12 @@ class ReportsService:
# @PARAM: value (Optional[datetime]) - Source datetime value.
# @RETURN: Optional[datetime] - UTC-aware datetime or None.
def _to_utc_datetime(self, value: Optional[datetime]) -> Optional[datetime]:
if value is None:
return None
if value.tzinfo is None:
return value.replace(tzinfo=timezone.utc)
return value.astimezone(timezone.utc)
with belief_scope("_to_utc_datetime"):
if value is None:
return None
if value.tzinfo is None:
return value.replace(tzinfo=timezone.utc)
return value.astimezone(timezone.utc)
# [/DEF:_to_utc_datetime:Function]
# [DEF:_datetime_sort_key:Function]
@@ -71,10 +76,11 @@ class ReportsService:
# @PARAM: report (TaskReport) - Report item.
# @RETURN: float - UTC timestamp key.
def _datetime_sort_key(self, report: TaskReport) -> float:
updated = self._to_utc_datetime(report.updated_at)
if updated is None:
return 0.0
return updated.timestamp()
with belief_scope("_datetime_sort_key"):
updated = self._to_utc_datetime(report.updated_at)
if updated is None:
return 0.0
return updated.timestamp()
# [/DEF:_datetime_sort_key:Function]
# [DEF:_matches_query:Function]
@@ -86,24 +92,25 @@ class ReportsService:
# @PARAM: query (ReportQuery) - Applied query.
# @RETURN: bool - True if report matches all filters.
def _matches_query(self, report: TaskReport, query: ReportQuery) -> bool:
if query.task_types and report.task_type not in query.task_types:
return False
if query.statuses and report.status not in query.statuses:
return False
report_updated_at = self._to_utc_datetime(report.updated_at)
query_time_from = self._to_utc_datetime(query.time_from)
query_time_to = self._to_utc_datetime(query.time_to)
if query_time_from and report_updated_at and report_updated_at < query_time_from:
return False
if query_time_to and report_updated_at and report_updated_at > query_time_to:
return False
if query.search:
needle = query.search.lower()
haystack = f"{report.summary} {report.task_type.value} {report.status.value}".lower()
if needle not in haystack:
with belief_scope("_matches_query"):
if query.task_types and report.task_type not in query.task_types:
return False
return True
if query.statuses and report.status not in query.statuses:
return False
report_updated_at = self._to_utc_datetime(report.updated_at)
query_time_from = self._to_utc_datetime(query.time_from)
query_time_to = self._to_utc_datetime(query.time_to)
if query_time_from and report_updated_at and report_updated_at < query_time_from:
return False
if query_time_to and report_updated_at and report_updated_at > query_time_to:
return False
if query.search:
needle = query.search.lower()
haystack = f"{report.summary} {report.task_type.value} {report.status.value}".lower()
if needle not in haystack:
return False
return True
# [/DEF:_matches_query:Function]
# [DEF:_sort_reports:Function]
@@ -115,16 +122,17 @@ class ReportsService:
# @PARAM: query (ReportQuery) - Sort config.
# @RETURN: List[TaskReport] - Sorted reports.
def _sort_reports(self, reports: List[TaskReport], query: ReportQuery) -> List[TaskReport]:
reverse = query.sort_order == "desc"
with belief_scope("_sort_reports"):
reverse = query.sort_order == "desc"
if query.sort_by == "status":
reports.sort(key=lambda item: item.status.value, reverse=reverse)
elif query.sort_by == "task_type":
reports.sort(key=lambda item: item.task_type.value, reverse=reverse)
else:
reports.sort(key=self._datetime_sort_key, reverse=reverse)
if query.sort_by == "status":
reports.sort(key=lambda item: item.status.value, reverse=reverse)
elif query.sort_by == "task_type":
reports.sort(key=lambda item: item.task_type.value, reverse=reverse)
else:
reports.sort(key=self._datetime_sort_key, reverse=reverse)
return reports
return reports
# [/DEF:_sort_reports:Function]
# [DEF:list_reports:Function]
@@ -134,24 +142,25 @@ class ReportsService:
# @PARAM: query (ReportQuery) - List filters and pagination.
# @RETURN: ReportCollection - Paginated unified reports payload.
def list_reports(self, query: ReportQuery) -> ReportCollection:
reports = self._load_normalized_reports()
filtered = [report for report in reports if self._matches_query(report, query)]
sorted_reports = self._sort_reports(filtered, query)
with belief_scope("list_reports"):
reports = self._load_normalized_reports()
filtered = [report for report in reports if self._matches_query(report, query)]
sorted_reports = self._sort_reports(filtered, query)
total = len(sorted_reports)
start = (query.page - 1) * query.page_size
end = start + query.page_size
items = sorted_reports[start:end]
has_next = end < total
total = len(sorted_reports)
start = (query.page - 1) * query.page_size
end = start + query.page_size
items = sorted_reports[start:end]
has_next = end < total
return ReportCollection(
items=items,
total=total,
page=query.page,
page_size=query.page_size,
has_next=has_next,
applied_filters=query,
)
return ReportCollection(
items=items,
total=total,
page=query.page,
page_size=query.page_size,
has_next=has_next,
applied_filters=query,
)
# [/DEF:list_reports:Function]
# [DEF:get_report_detail:Function]
@@ -161,34 +170,35 @@ class ReportsService:
# @PARAM: report_id (str) - Stable report identifier.
# @RETURN: Optional[ReportDetailView] - Detailed report or None if not found.
def get_report_detail(self, report_id: str) -> Optional[ReportDetailView]:
reports = self._load_normalized_reports()
target = next((report for report in reports if report.report_id == report_id), None)
if not target:
return None
with belief_scope("get_report_detail"):
reports = self._load_normalized_reports()
target = next((report for report in reports if report.report_id == report_id), None)
if not target:
return None
timeline = []
if target.started_at:
timeline.append({"event": "started", "at": target.started_at.isoformat()})
timeline.append({"event": "updated", "at": target.updated_at.isoformat()})
timeline = []
if target.started_at:
timeline.append({"event": "started", "at": target.started_at.isoformat()})
timeline.append({"event": "updated", "at": target.updated_at.isoformat()})
diagnostics = target.details or {}
if not diagnostics:
diagnostics = {"note": "Not provided"}
if target.error_context:
diagnostics["error_context"] = target.error_context.model_dump()
diagnostics = target.details or {}
if not diagnostics:
diagnostics = {"note": "Not provided"}
if target.error_context:
diagnostics["error_context"] = target.error_context.model_dump()
next_actions = []
if target.error_context and target.error_context.next_actions:
next_actions = target.error_context.next_actions
elif target.status in {ReportStatus.FAILED, ReportStatus.PARTIAL}:
next_actions = ["Review diagnostics", "Retry task if applicable"]
next_actions = []
if target.error_context and target.error_context.next_actions:
next_actions = target.error_context.next_actions
elif target.status in {ReportStatus.FAILED, ReportStatus.PARTIAL}:
next_actions = ["Review diagnostics", "Retry task if applicable"]
return ReportDetailView(
report=target,
timeline=timeline,
diagnostics=diagnostics,
next_actions=next_actions,
)
return ReportDetailView(
report=target,
timeline=timeline,
diagnostics=diagnostics,
next_actions=next_actions,
)
# [/DEF:get_report_detail:Function]
# [/DEF:ReportsService:Class]

View File

@@ -9,6 +9,7 @@
# [SECTION: IMPORTS]
from typing import Any, Dict, Optional
from ...core.logger import belief_scope
from ...models.report import TaskType
# [/SECTION]
@@ -71,10 +72,11 @@ TASK_TYPE_PROFILES: Dict[TaskType, Dict[str, Any]] = {
# @PARAM: plugin_id (Optional[str]) - Source plugin/task identifier from task record.
# @RETURN: TaskType - Resolved canonical type or UNKNOWN fallback.
def resolve_task_type(plugin_id: Optional[str]) -> TaskType:
normalized = (plugin_id or "").strip()
if not normalized:
return TaskType.UNKNOWN
return PLUGIN_TO_TASK_TYPE.get(normalized, TaskType.UNKNOWN)
with belief_scope("resolve_task_type"):
normalized = (plugin_id or "").strip()
if not normalized:
return TaskType.UNKNOWN
return PLUGIN_TO_TASK_TYPE.get(normalized, TaskType.UNKNOWN)
# [/DEF:resolve_task_type:Function]
@@ -85,7 +87,8 @@ def resolve_task_type(plugin_id: Optional[str]) -> TaskType:
# @PARAM: task_type (TaskType) - Canonical task type.
# @RETURN: Dict[str, Any] - Profile metadata used by normalization and UI contracts.
def get_type_profile(task_type: TaskType) -> Dict[str, Any]:
return TASK_TYPE_PROFILES.get(task_type, TASK_TYPE_PROFILES[TaskType.UNKNOWN])
with belief_scope("get_type_profile"):
return TASK_TYPE_PROFILES.get(task_type, TASK_TYPE_PROFILES[TaskType.UNKNOWN])
# [/DEF:get_type_profile:Function]
# [/DEF:backend.src.services.reports.type_profiles:Module]