Files
ss-tools/.ai/shots/backend_route.py
2026-02-19 17:43:45 +03:00

58 lines
2.3 KiB
Python

# [DEF:Shot:FastAPI_Route:Example]
# @PURPOSE: Reference implementation of a task-based route using GRACE-Poly.
# @RELATION: IMPLEMENTS -> [DEF:Std:API_FastAPI]
from typing import List, Dict, Any, Optional
from fastapi import APIRouter, Depends, HTTPException, status
from pydantic import BaseModel
from ...core.logger import belief_scope
from ...core.task_manager import TaskManager, Task
from ...core.config_manager import ConfigManager
from ...dependencies import get_task_manager, get_config_manager, has_permission, get_current_user
router = APIRouter()
class CreateTaskRequest(BaseModel):
plugin_id: str
params: Dict[str, Any]
@router.post("/tasks", response_model=Task, status_code=status.HTTP_201_CREATED)
# [DEF:create_task:Function]
# @PURPOSE: Create and start a new task using TaskManager. Non-blocking.
# @PARAM: request (CreateTaskRequest) - Plugin and params.
# @PARAM: task_manager (TaskManager) - Async task executor.
# @PARAM: config (ConfigManager) - Centralized configuration.
# @PRE: plugin_id must exist; config must be initialized.
# @POST: A new task is spawned; Task ID returned immediately.
async def create_task(
request: CreateTaskRequest,
task_manager: TaskManager = Depends(get_task_manager),
config: ConfigManager = Depends(get_config_manager),
current_user = Depends(get_current_user)
):
# RBAC: Dynamic permission check
has_permission(f"plugin:{request.plugin_id}", "EXECUTE")(current_user)
with belief_scope("create_task"):
try:
# 1. Action: Resolve setting using ConfigManager (Example)
timeout = config.get("TASKS_DEFAULT_TIMEOUT", 3600)
# 2. Action: Spawn async task via TaskManager
# @RELATION: CALLS -> task_manager.create_task
task = await task_manager.create_task(
plugin_id=request.plugin_id,
params={**request.params, "timeout": timeout}
)
return task
except Exception as e:
# Evaluation: Proper error mapping and logging
# @UX_STATE: Error feedback to frontend
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Task creation failed: {str(e)}"
)
# [/DEF:create_task:Function]
# [/DEF:Shot:FastAPI_Route]