# [DEF:backend.src.services.resource_service:Module] # @TIER: STANDARD # @SEMANTICS: service, resources, dashboards, datasets, tasks, git # @PURPOSE: Shared service for fetching resource data with Git status and task status # @LAYER: Service # @RELATION: DEPENDS_ON -> backend.src.core.superset_client # @RELATION: DEPENDS_ON -> backend.src.core.task_manager # @RELATION: DEPENDS_ON -> backend.src.services.git_service # @INVARIANT: All resources include metadata about their current state # [SECTION: IMPORTS] from typing import List, Dict, Optional, Any from datetime import datetime from ..core.superset_client import SupersetClient from ..core.task_manager.models import Task from ..services.git_service import GitService from ..core.logger import logger, belief_scope # [/SECTION] # [DEF:ResourceService:Class] # @PURPOSE: Provides centralized access to resource data with enhanced metadata class ResourceService: # [DEF:__init__:Function] # @PURPOSE: Initialize the resource service with dependencies # @PRE: None # @POST: ResourceService is ready to fetch resources def __init__(self): with belief_scope("ResourceService.__init__"): self.git_service = GitService() logger.info("[ResourceService][Action] Initialized ResourceService") # [/DEF:__init__:Function] # [DEF:get_dashboards_with_status:Function] # @PURPOSE: Fetch dashboards from environment with Git status and last task status # @PRE: env is a valid Environment object # @POST: Returns list of dashboards with enhanced metadata # @PARAM: env (Environment) - The environment to fetch from # @PARAM: tasks (List[Task]) - List of tasks to check for status # @RETURN: List[Dict] - Dashboards with git_status and last_task fields # @RELATION: CALLS -> SupersetClient.get_dashboards_summary # @RELATION: CALLS -> self._get_git_status_for_dashboard # @RELATION: CALLS -> self._get_last_llm_task_for_dashboard async def get_dashboards_with_status( self, env: Any, tasks: Optional[List[Task]] = None, include_git_status: bool = True, ) -> List[Dict[str, Any]]: with belief_scope("get_dashboards_with_status", f"env={env.id}"): client = SupersetClient(env) dashboards = client.get_dashboards_summary() # Enhance each dashboard with Git status and task status result = [] for dashboard in dashboards: # dashboard is already a dict, no need to call .dict() dashboard_dict = dashboard dashboard_id = dashboard_dict.get('id') # Git status can be skipped for list endpoints and loaded lazily on UI side. if include_git_status: git_status = self._get_git_status_for_dashboard(dashboard_id) dashboard_dict['git_status'] = git_status else: dashboard_dict['git_status'] = None # Show status of the latest LLM validation for this dashboard. last_task = self._get_last_llm_task_for_dashboard( dashboard_id, env.id, tasks, ) dashboard_dict['last_task'] = last_task result.append(dashboard_dict) logger.info(f"[ResourceService][Coherence:OK] Fetched {len(result)} dashboards with status") return result # [/DEF:get_dashboards_with_status:Function] # [DEF:get_dashboards_page_with_status:Function] # @PURPOSE: Fetch one dashboard page from environment and enrich only that page with status metadata. # @PRE: env is valid; page >= 1; page_size > 0. # @POST: Returns page items plus total counters without scanning all pages locally. # @PARAM: env (Environment) - Source environment. # @PARAM: tasks (Optional[List[Task]]) - Tasks for latest LLM status. # @PARAM: page (int) - 1-based page number. # @PARAM: page_size (int) - Page size. # @RETURN: Dict[str, Any] - {"dashboards": List[Dict], "total": int, "total_pages": int} async def get_dashboards_page_with_status( self, env: Any, tasks: Optional[List[Task]] = None, page: int = 1, page_size: int = 10, search: Optional[str] = None, include_git_status: bool = True, ) -> Dict[str, Any]: with belief_scope( "get_dashboards_page_with_status", f"env={env.id}, page={page}, page_size={page_size}, search={search}", ): client = SupersetClient(env) total, dashboards_page = client.get_dashboards_summary_page( page=page, page_size=page_size, search=search, ) result = [] for dashboard in dashboards_page: dashboard_dict = dashboard dashboard_id = dashboard_dict.get("id") if include_git_status: dashboard_dict["git_status"] = self._get_git_status_for_dashboard(dashboard_id) else: dashboard_dict["git_status"] = None dashboard_dict["last_task"] = self._get_last_llm_task_for_dashboard( dashboard_id, env.id, tasks, ) result.append(dashboard_dict) total_pages = (total + page_size - 1) // page_size if total > 0 else 1 logger.info( "[ResourceService][Coherence:OK] Fetched dashboards page %s/%s (%s items, total=%s)", page, total_pages, len(result), total, ) return { "dashboards": result, "total": total, "total_pages": total_pages, } # [/DEF:get_dashboards_page_with_status:Function] # [DEF:_get_last_llm_task_for_dashboard:Function] # @PURPOSE: Get most recent LLM validation task for a dashboard in an environment # @PRE: dashboard_id is a valid integer identifier # @POST: Returns the newest llm_dashboard_validation task summary or None # @PARAM: dashboard_id (int) - The dashboard ID # @PARAM: env_id (Optional[str]) - Environment ID to match task params # @PARAM: tasks (Optional[List[Task]]) - List of tasks to search # @RETURN: Optional[Dict] - Task summary with task_id and status def _get_last_llm_task_for_dashboard( self, dashboard_id: int, env_id: Optional[str], tasks: Optional[List[Task]] = None, ) -> Optional[Dict[str, Any]]: if not tasks: return None dashboard_id_str = str(dashboard_id) matched_tasks = [] for task in tasks: if getattr(task, "plugin_id", None) != "llm_dashboard_validation": continue params = getattr(task, "params", {}) or {} if str(params.get("dashboard_id")) != dashboard_id_str: continue if env_id is not None: task_env = params.get("environment_id") or params.get("env") if str(task_env) != str(env_id): continue matched_tasks.append(task) if not matched_tasks: return None def _task_time(task_obj: Any) -> datetime: return ( getattr(task_obj, "started_at", None) or getattr(task_obj, "finished_at", None) or getattr(task_obj, "created_at", None) or datetime.min ) last_task = max(matched_tasks, key=_task_time) raw_result = getattr(last_task, "result", None) validation_status = None if isinstance(raw_result, dict): validation_status = self._normalize_validation_status(raw_result.get("status")) return { "task_id": str(getattr(last_task, "id", "")), "status": self._normalize_task_status(getattr(last_task, "status", "")), "validation_status": validation_status, } # [/DEF:_get_last_llm_task_for_dashboard:Function] # [DEF:_normalize_task_status:Function] # @PURPOSE: Normalize task status to stable uppercase values for UI/API projections # @PRE: raw_status can be enum or string # @POST: Returns uppercase status without enum class prefix # @PARAM: raw_status (Any) - Raw task status object/value # @RETURN: str - Normalized status token def _normalize_task_status(self, raw_status: Any) -> str: if raw_status is None: return "" value = getattr(raw_status, "value", raw_status) status_text = str(value).strip() if "." in status_text: status_text = status_text.split(".")[-1] return status_text.upper() # [/DEF:_normalize_task_status:Function] # [DEF:_normalize_validation_status:Function] # @PURPOSE: Normalize LLM validation status to PASS/FAIL/WARN/UNKNOWN # @PRE: raw_status can be any scalar type # @POST: Returns normalized validation status token or None # @PARAM: raw_status (Any) - Raw validation status from task result # @RETURN: Optional[str] - PASS|FAIL|WARN|UNKNOWN def _normalize_validation_status(self, raw_status: Any) -> Optional[str]: if raw_status is None: return None status_text = str(raw_status).strip().upper() if status_text in {"PASS", "FAIL", "WARN"}: return status_text return "UNKNOWN" # [/DEF:_normalize_validation_status:Function] # [DEF:get_datasets_with_status:Function] # @PURPOSE: Fetch datasets from environment with mapping progress and last task status # @PRE: env is a valid Environment object # @POST: Returns list of datasets with enhanced metadata # @PARAM: env (Environment) - The environment to fetch from # @PARAM: tasks (List[Task]) - List of tasks to check for status # @RETURN: List[Dict] - Datasets with mapped_fields and last_task fields # @RELATION: CALLS -> SupersetClient.get_datasets_summary # @RELATION: CALLS -> self._get_last_task_for_resource async def get_datasets_with_status( self, env: Any, tasks: Optional[List[Task]] = None ) -> List[Dict[str, Any]]: with belief_scope("get_datasets_with_status", f"env={env.id}"): client = SupersetClient(env) datasets = client.get_datasets_summary() # Enhance each dataset with task status result = [] for dataset in datasets: # dataset is already a dict, no need to call .dict() dataset_dict = dataset dataset_id = dataset_dict.get('id') # Get last task status last_task = self._get_last_task_for_resource( f"dataset-{dataset_id}", tasks ) dataset_dict['last_task'] = last_task result.append(dataset_dict) logger.info(f"[ResourceService][Coherence:OK] Fetched {len(result)} datasets with status") return result # [/DEF:get_datasets_with_status:Function] # [DEF:get_activity_summary:Function] # @PURPOSE: Get summary of active and recent tasks for the activity indicator # @PRE: tasks is a list of Task objects # @POST: Returns summary with active_count and recent_tasks # @PARAM: tasks (List[Task]) - List of tasks to summarize # @RETURN: Dict - Activity summary def get_activity_summary(self, tasks: List[Task]) -> Dict[str, Any]: with belief_scope("get_activity_summary"): # Count active (RUNNING, WAITING_INPUT) tasks active_tasks = [ t for t in tasks if t.status in ['RUNNING', 'WAITING_INPUT'] ] # Get recent tasks (last 5) recent_tasks = sorted( tasks, key=lambda t: t.created_at, reverse=True )[:5] # Format recent tasks for frontend recent_tasks_formatted = [] for task in recent_tasks: resource_name = self._extract_resource_name_from_task(task) recent_tasks_formatted.append({ 'task_id': str(task.id), 'resource_name': resource_name, 'resource_type': self._extract_resource_type_from_task(task), 'status': task.status, 'started_at': task.created_at.isoformat() if task.created_at else None }) return { 'active_count': len(active_tasks), 'recent_tasks': recent_tasks_formatted } # [/DEF:get_activity_summary:Function] # [DEF:_get_git_status_for_dashboard:Function] # @PURPOSE: Get Git sync status for a dashboard # @PRE: dashboard_id is a valid integer # @POST: Returns git status or None if no repo exists # @PARAM: dashboard_id (int) - The dashboard ID # @RETURN: Optional[Dict] - Git status with branch and sync_status # @RELATION: CALLS -> GitService.get_repo def _get_git_status_for_dashboard(self, dashboard_id: int) -> Optional[Dict[str, Any]]: try: repo = self.git_service.get_repo(dashboard_id) if not repo: return { 'branch': None, 'sync_status': 'NO_REPO', 'has_repo': False, 'has_changes_for_commit': False } # Check if there are uncommitted changes try: # Get current branch branch = repo.active_branch.name # Check for uncommitted changes is_dirty = repo.is_dirty() has_changes_for_commit = repo.is_dirty(untracked_files=True) # Check for unpushed commits unpushed = len(list(repo.iter_commits(f'{branch}@{{u}}..{branch}'))) if '@{u}' in str(repo.refs) else 0 if is_dirty or unpushed > 0: sync_status = 'DIFF' else: sync_status = 'OK' return { 'branch': branch, 'sync_status': sync_status, 'has_repo': True, 'has_changes_for_commit': has_changes_for_commit } except Exception: logger.warning(f"[ResourceService][Warning] Failed to get git status for dashboard {dashboard_id}") return { 'branch': None, 'sync_status': 'ERROR', 'has_repo': True, 'has_changes_for_commit': False } except Exception: # No repo exists for this dashboard return { 'branch': None, 'sync_status': 'NO_REPO', 'has_repo': False, 'has_changes_for_commit': False } # [/DEF:_get_git_status_for_dashboard:Function] # [DEF:_get_last_task_for_resource:Function] # @PURPOSE: Get the most recent task for a specific resource # @PRE: resource_id is a valid string # @POST: Returns task summary or None if no tasks found # @PARAM: resource_id (str) - The resource identifier (e.g., "dashboard-123") # @PARAM: tasks (Optional[List[Task]]) - List of tasks to search # @RETURN: Optional[Dict] - Task summary with task_id and status def _get_last_task_for_resource( self, resource_id: str, tasks: Optional[List[Task]] = None ) -> Optional[Dict[str, Any]]: if not tasks: return None # Filter tasks for this resource resource_tasks = [] for task in tasks: params = task.params or {} if params.get('resource_id') == resource_id: resource_tasks.append(task) if not resource_tasks: return None # Get most recent task last_task = max(resource_tasks, key=lambda t: t.created_at) return { 'task_id': str(last_task.id), 'status': last_task.status } # [/DEF:_get_last_task_for_resource:Function] # [DEF:_extract_resource_name_from_task:Function] # @PURPOSE: Extract resource name from task params # @PRE: task is a valid Task object # @POST: Returns resource name or task ID # @PARAM: task (Task) - The task to extract from # @RETURN: str - Resource name or fallback def _extract_resource_name_from_task(self, task: Task) -> str: params = task.params or {} return params.get('resource_name', f"Task {task.id}") # [/DEF:_extract_resource_name_from_task:Function] # [DEF:_extract_resource_type_from_task:Function] # @PURPOSE: Extract resource type from task params # @PRE: task is a valid Task object # @POST: Returns resource type or 'unknown' # @PARAM: task (Task) - The task to extract from # @RETURN: str - Resource type def _extract_resource_type_from_task(self, task: Task) -> str: params = task.params or {} return params.get('resource_type', 'unknown') # [/DEF:_extract_resource_type_from_task:Function] # [/DEF:ResourceService:Class] # [/DEF:backend.src.services.resource_service:Module]