# [DEF:backend.src.api.routes.datasets:Module] # # @TIER: STANDARD # @SEMANTICS: api, datasets, resources, hub # @PURPOSE: API endpoints for the Dataset Hub - listing datasets with mapping progress # @LAYER: API # @RELATION: DEPENDS_ON -> backend.src.dependencies # @RELATION: DEPENDS_ON -> backend.src.services.resource_service # @RELATION: DEPENDS_ON -> backend.src.core.superset_client # # @INVARIANT: All dataset responses include last_task metadata # [SECTION: IMPORTS] from fastapi import APIRouter, Depends, HTTPException from typing import List, Optional from pydantic import BaseModel, Field from ...dependencies import get_config_manager, get_task_manager, get_resource_service, has_permission from ...core.logger import logger, belief_scope # [/SECTION] router = APIRouter() # [DEF:MappedFields:DataClass] class MappedFields(BaseModel): total: int mapped: int # [/DEF:MappedFields:DataClass] # [DEF:LastTask:DataClass] class LastTask(BaseModel): task_id: Optional[str] = None status: Optional[str] = Field(None, pattern="^RUNNING|SUCCESS|ERROR|WAITING_INPUT$") # [/DEF:LastTask:DataClass] # [DEF:DatasetItem:DataClass] class DatasetItem(BaseModel): id: int table_name: str schema: str database: str mapped_fields: Optional[MappedFields] = None last_task: Optional[LastTask] = None # [/DEF:DatasetItem:DataClass] # [DEF:DatasetsResponse:DataClass] class DatasetsResponse(BaseModel): datasets: List[DatasetItem] total: int # [/DEF:DatasetsResponse:DataClass] # [DEF:get_datasets:Function] # @PURPOSE: Fetch list of datasets from a specific environment with mapping progress # @PRE: env_id must be a valid environment ID # @POST: Returns a list of datasets with enhanced metadata # @PARAM: env_id (str) - The environment ID to fetch datasets from # @PARAM: search (Optional[str]) - Filter by table name # @RETURN: DatasetsResponse - List of datasets with status metadata # @RELATION: CALLS -> ResourceService.get_datasets_with_status @router.get("/api/datasets", response_model=DatasetsResponse) async def get_datasets( env_id: str, search: Optional[str] = None, config_manager=Depends(get_config_manager), task_manager=Depends(get_task_manager), resource_service=Depends(get_resource_service), _ = Depends(has_permission("plugin:migration", "READ")) ): with belief_scope("get_datasets", f"env_id={env_id}, search={search}"): # Validate environment exists environments = config_manager.get_environments() env = next((e for e in environments if e.id == env_id), None) if not env: logger.error(f"[get_datasets][Coherence:Failed] Environment not found: {env_id}") raise HTTPException(status_code=404, detail="Environment not found") try: # Get all tasks for status lookup all_tasks = task_manager.get_all_tasks() # Fetch datasets with status using ResourceService datasets = await resource_service.get_datasets_with_status(env, all_tasks) # Apply search filter if provided if search: search_lower = search.lower() datasets = [ d for d in datasets if search_lower in d.get('table_name', '').lower() ] logger.info(f"[get_datasets][Coherence:OK] Returning {len(datasets)} datasets") return DatasetsResponse( datasets=datasets, total=len(datasets) ) except Exception as e: logger.error(f"[get_datasets][Coherence:Failed] Failed to fetch datasets: {e}") raise HTTPException(status_code=503, detail=f"Failed to fetch datasets: {str(e)}") # [/DEF:get_datasets:Function] # [/DEF:backend.src.api.routes.datasets:Module]