# [DEF:TasksRouter:Module] # @TIER: STANDARD # @SEMANTICS: api, router, tasks, create, list, get, logs # @PURPOSE: Defines the FastAPI router for task-related endpoints, allowing clients to create, list, and get the status of tasks. # @LAYER: UI (API) # @RELATION: Depends on the TaskManager. It is included by the main app. from typing import List, Dict, Any, Optional from fastapi import APIRouter, Depends, HTTPException, status, Query from pydantic import BaseModel from ...core.logger import belief_scope from ...core.task_manager import TaskManager, Task, TaskStatus, LogEntry from ...core.task_manager.models import LogFilter, LogStats from ...dependencies import get_task_manager, has_permission, get_current_user router = APIRouter() class CreateTaskRequest(BaseModel): plugin_id: str params: Dict[str, Any] class ResolveTaskRequest(BaseModel): resolution_params: Dict[str, Any] class ResumeTaskRequest(BaseModel): passwords: Dict[str, str] @router.post("", response_model=Task, status_code=status.HTTP_201_CREATED) # [DEF:create_task:Function] # @PURPOSE: Create and start a new task for a given plugin. # @PARAM: request (CreateTaskRequest) - The request body containing plugin_id and params. # @PARAM: task_manager (TaskManager) - The task manager instance. # @PRE: plugin_id must exist and params must be valid for that plugin. # @POST: A new task is created and started. # @RETURN: Task - The created task instance. async def create_task( request: CreateTaskRequest, task_manager: TaskManager = Depends(get_task_manager), current_user = Depends(get_current_user) ): # Dynamic permission check based on plugin_id has_permission(f"plugin:{request.plugin_id}", "EXECUTE")(current_user) """ Create and start a new task for a given plugin. """ with belief_scope("create_task"): try: # Special handling for validation task to include provider config if request.plugin_id == "llm_dashboard_validation": from ...core.database import SessionLocal from ...services.llm_provider import LLMProviderService db = SessionLocal() try: llm_service = LLMProviderService(db) provider_id = request.params.get("provider_id") if provider_id: db_provider = llm_service.get_provider(provider_id) if not db_provider: raise ValueError(f"LLM Provider {provider_id} not found") finally: db.close() task = await task_manager.create_task( plugin_id=request.plugin_id, params=request.params ) return task except ValueError as e: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e)) # [/DEF:create_task:Function] @router.get("", response_model=List[Task]) # [DEF:list_tasks:Function] # @PURPOSE: Retrieve a list of tasks with pagination and optional status filter. # @PARAM: limit (int) - Maximum number of tasks to return. # @PARAM: offset (int) - Number of tasks to skip. # @PARAM: status (Optional[TaskStatus]) - Filter by task status. # @PARAM: task_manager (TaskManager) - The task manager instance. # @PRE: task_manager must be available. # @POST: Returns a list of tasks. # @RETURN: List[Task] - List of tasks. async def list_tasks( limit: int = 10, offset: int = 0, status: Optional[TaskStatus] = None, task_manager: TaskManager = Depends(get_task_manager), _ = Depends(has_permission("tasks", "READ")) ): """ Retrieve a list of tasks with pagination and optional status filter. """ with belief_scope("list_tasks"): return task_manager.get_tasks(limit=limit, offset=offset, status=status) # [/DEF:list_tasks:Function] @router.get("/{task_id}", response_model=Task) # [DEF:get_task:Function] # @PURPOSE: Retrieve the details of a specific task. # @PARAM: task_id (str) - The unique identifier of the task. # @PARAM: task_manager (TaskManager) - The task manager instance. # @PRE: task_id must exist. # @POST: Returns task details or raises 404. # @RETURN: Task - The task details. async def get_task( task_id: str, task_manager: TaskManager = Depends(get_task_manager), _ = Depends(has_permission("tasks", "READ")) ): """ Retrieve the details of a specific task. """ with belief_scope("get_task"): task = task_manager.get_task(task_id) if not task: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Task not found") return task # [/DEF:get_task:Function] @router.get("/{task_id}/logs", response_model=List[LogEntry]) # [DEF:get_task_logs:Function] # @PURPOSE: Retrieve logs for a specific task with optional filtering. # @PARAM: task_id (str) - The unique identifier of the task. # @PARAM: level (Optional[str]) - Filter by log level (DEBUG, INFO, WARNING, ERROR). # @PARAM: source (Optional[str]) - Filter by source component. # @PARAM: search (Optional[str]) - Text search in message. # @PARAM: offset (int) - Number of logs to skip. # @PARAM: limit (int) - Maximum number of logs to return. # @PARAM: task_manager (TaskManager) - The task manager instance. # @PRE: task_id must exist. # @POST: Returns a list of log entries or raises 404. # @RETURN: List[LogEntry] - List of log entries. # @TIER: CRITICAL async def get_task_logs( task_id: str, level: Optional[str] = Query(None, description="Filter by log level (DEBUG, INFO, WARNING, ERROR)"), source: Optional[str] = Query(None, description="Filter by source component"), search: Optional[str] = Query(None, description="Text search in message"), offset: int = Query(0, ge=0, description="Number of logs to skip"), limit: int = Query(100, ge=1, le=1000, description="Maximum number of logs to return"), task_manager: TaskManager = Depends(get_task_manager), _ = Depends(has_permission("tasks", "READ")) ): """ Retrieve logs for a specific task with optional filtering. Supports filtering by level, source, and text search. """ with belief_scope("get_task_logs"): task = task_manager.get_task(task_id) if not task: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Task not found") log_filter = LogFilter( level=level.upper() if level else None, source=source, search=search, offset=offset, limit=limit ) return task_manager.get_task_logs(task_id, log_filter) # [/DEF:get_task_logs:Function] @router.get("/{task_id}/logs/stats", response_model=LogStats) # [DEF:get_task_log_stats:Function] # @PURPOSE: Get statistics about logs for a task (counts by level and source). # @PARAM: task_id (str) - The unique identifier of the task. # @PARAM: task_manager (TaskManager) - The task manager instance. # @PRE: task_id must exist. # @POST: Returns log statistics or raises 404. # @RETURN: LogStats - Statistics about task logs. async def get_task_log_stats( task_id: str, task_manager: TaskManager = Depends(get_task_manager), _ = Depends(has_permission("tasks", "READ")) ): """ Get statistics about logs for a task (counts by level and source). """ with belief_scope("get_task_log_stats"): task = task_manager.get_task(task_id) if not task: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Task not found") return task_manager.get_task_log_stats(task_id) # [/DEF:get_task_log_stats:Function] @router.get("/{task_id}/logs/sources", response_model=List[str]) # [DEF:get_task_log_sources:Function] # @PURPOSE: Get unique sources for a task's logs. # @PARAM: task_id (str) - The unique identifier of the task. # @PARAM: task_manager (TaskManager) - The task manager instance. # @PRE: task_id must exist. # @POST: Returns list of unique source names or raises 404. # @RETURN: List[str] - Unique source names. async def get_task_log_sources( task_id: str, task_manager: TaskManager = Depends(get_task_manager), _ = Depends(has_permission("tasks", "READ")) ): """ Get unique sources for a task's logs. """ with belief_scope("get_task_log_sources"): task = task_manager.get_task(task_id) if not task: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Task not found") return task_manager.get_task_log_sources(task_id) # [/DEF:get_task_log_sources:Function] @router.post("/{task_id}/resolve", response_model=Task) # [DEF:resolve_task:Function] # @PURPOSE: Resolve a task that is awaiting mapping. # @PARAM: task_id (str) - The unique identifier of the task. # @PARAM: request (ResolveTaskRequest) - The resolution parameters. # @PARAM: task_manager (TaskManager) - The task manager instance. # @PRE: task must be in AWAITING_MAPPING status. # @POST: Task is resolved and resumes execution. # @RETURN: Task - The updated task object. async def resolve_task( task_id: str, request: ResolveTaskRequest, task_manager: TaskManager = Depends(get_task_manager), _ = Depends(has_permission("tasks", "WRITE")) ): """ Resolve a task that is awaiting mapping. """ with belief_scope("resolve_task"): try: await task_manager.resolve_task(task_id, request.resolution_params) return task_manager.get_task(task_id) except ValueError as e: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) # [/DEF:resolve_task:Function] @router.post("/{task_id}/resume", response_model=Task) # [DEF:resume_task:Function] # @PURPOSE: Resume a task that is awaiting input (e.g., passwords). # @PARAM: task_id (str) - The unique identifier of the task. # @PARAM: request (ResumeTaskRequest) - The input (passwords). # @PARAM: task_manager (TaskManager) - The task manager instance. # @PRE: task must be in AWAITING_INPUT status. # @POST: Task resumes execution with provided input. # @RETURN: Task - The updated task object. async def resume_task( task_id: str, request: ResumeTaskRequest, task_manager: TaskManager = Depends(get_task_manager), _ = Depends(has_permission("tasks", "WRITE")) ): """ Resume a task that is awaiting input (e.g., passwords). """ with belief_scope("resume_task"): try: task_manager.resume_task_with_password(task_id, request.passwords) return task_manager.get_task(task_id) except ValueError as e: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) # [/DEF:resume_task:Function] @router.delete("", status_code=status.HTTP_204_NO_CONTENT) # [DEF:clear_tasks:Function] # @PURPOSE: Clear tasks matching the status filter. # @PARAM: status (Optional[TaskStatus]) - Filter by task status. # @PARAM: task_manager (TaskManager) - The task manager instance. # @PRE: task_manager is available. # @POST: Tasks are removed from memory/persistence. async def clear_tasks( status: Optional[TaskStatus] = None, task_manager: TaskManager = Depends(get_task_manager), _ = Depends(has_permission("tasks", "WRITE")) ): """ Clear tasks matching the status filter. If no filter, clears all non-running tasks. """ with belief_scope("clear_tasks", f"status={status}"): task_manager.clear_tasks(status) return # [/DEF:clear_tasks:Function] # [/DEF:TasksRouter:Module]