feat: Introduce and enforce test contract annotations for critical modules and update coverage tracking.

This commit is contained in:
2026-03-04 12:58:42 +03:00
parent 09e59ba88b
commit 7194f6a4c4
16 changed files with 15569 additions and 3336 deletions

View File

@@ -97,17 +97,17 @@ def test_get_dashboards_with_search(mock_deps):
mock_deps["config"].get_environments.return_value = [mock_env]
mock_deps["task"].get_all_tasks.return_value = []
async def mock_get_dashboards(env, tasks):
async def mock_get_dashboards(env, tasks, include_git_status=False):
return [
{"id": 1, "title": "Sales Report", "slug": "sales"},
{"id": 2, "title": "Marketing Dashboard", "slug": "marketing"}
{"id": 1, "title": "Sales Report", "slug": "sales", "git_status": {"branch": "main", "sync_status": "OK"}, "last_task": None},
{"id": 2, "title": "Marketing Dashboard", "slug": "marketing", "git_status": {"branch": "main", "sync_status": "OK"}, "last_task": None}
]
mock_deps["resource"].get_dashboards_with_status = AsyncMock(
side_effect=mock_get_dashboards
)
response = client.get("/api/dashboards?env_id=prod&search=sales")
assert response.status_code == 200
data = response.json()
# @POST: Filtered result count must match search

View File

@@ -4,30 +4,30 @@
# @PURPOSE: Defines the FastAPI router for task-related endpoints, allowing clients to create, list, and get the status of tasks.
# @LAYER: UI (API)
# @RELATION: Depends on the TaskManager. It is included by the main app.
from typing import List, Dict, Any, Optional
from typing import List, Dict, Any, Optional
from fastapi import APIRouter, Depends, HTTPException, status, Query
from pydantic import BaseModel
from ...core.logger import belief_scope
from ...core.task_manager import TaskManager, Task, TaskStatus, LogEntry
from ...core.task_manager.models import LogFilter, LogStats
from ...dependencies import get_task_manager, has_permission, get_current_user, get_config_manager
from ...core.config_manager import ConfigManager
from ...services.llm_prompt_templates import (
is_multimodal_model,
normalize_llm_settings,
resolve_bound_provider_id,
)
from ...core.task_manager import TaskManager, Task, TaskStatus, LogEntry
from ...core.task_manager.models import LogFilter, LogStats
from ...dependencies import get_task_manager, has_permission, get_current_user, get_config_manager
from ...core.config_manager import ConfigManager
from ...services.llm_prompt_templates import (
is_multimodal_model,
normalize_llm_settings,
resolve_bound_provider_id,
)
router = APIRouter()
TASK_TYPE_PLUGIN_MAP = {
"llm_validation": ["llm_dashboard_validation"],
"backup": ["superset-backup"],
"migration": ["superset-migration"],
}
class CreateTaskRequest(BaseModel):
router = APIRouter()
TASK_TYPE_PLUGIN_MAP = {
"llm_validation": ["llm_dashboard_validation"],
"backup": ["superset-backup"],
"migration": ["superset-migration"],
}
class CreateTaskRequest(BaseModel):
plugin_id: str
params: Dict[str, Any]
@@ -45,54 +45,54 @@ class ResumeTaskRequest(BaseModel):
# @PRE: plugin_id must exist and params must be valid for that plugin.
# @POST: A new task is created and started.
# @RETURN: Task - The created task instance.
async def create_task(
request: CreateTaskRequest,
task_manager: TaskManager = Depends(get_task_manager),
current_user = Depends(get_current_user),
config_manager: ConfigManager = Depends(get_config_manager),
):
async def create_task(
request: CreateTaskRequest,
task_manager: TaskManager = Depends(get_task_manager),
current_user = Depends(get_current_user),
config_manager: ConfigManager = Depends(get_config_manager),
):
# Dynamic permission check based on plugin_id
has_permission(f"plugin:{request.plugin_id}", "EXECUTE")(current_user)
"""
Create and start a new task for a given plugin.
"""
with belief_scope("create_task"):
try:
# Special handling for LLM tasks to resolve provider config by task binding.
if request.plugin_id in {"llm_dashboard_validation", "llm_documentation"}:
from ...core.database import SessionLocal
from ...services.llm_provider import LLMProviderService
db = SessionLocal()
try:
llm_service = LLMProviderService(db)
provider_id = request.params.get("provider_id")
if not provider_id:
llm_settings = normalize_llm_settings(config_manager.get_config().settings.llm)
binding_key = "dashboard_validation" if request.plugin_id == "llm_dashboard_validation" else "documentation"
provider_id = resolve_bound_provider_id(llm_settings, binding_key)
if provider_id:
request.params["provider_id"] = provider_id
if not provider_id:
providers = llm_service.get_all_providers()
active_provider = next((p for p in providers if p.is_active), None)
if active_provider:
provider_id = active_provider.id
request.params["provider_id"] = provider_id
if provider_id:
db_provider = llm_service.get_provider(provider_id)
if not db_provider:
raise ValueError(f"LLM Provider {provider_id} not found")
if request.plugin_id == "llm_dashboard_validation" and not is_multimodal_model(
db_provider.default_model,
db_provider.provider_type,
):
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Selected provider model is not multimodal for dashboard validation",
)
finally:
db.close()
try:
# Special handling for LLM tasks to resolve provider config by task binding.
if request.plugin_id in {"llm_dashboard_validation", "llm_documentation"}:
from ...core.database import SessionLocal
from ...services.llm_provider import LLMProviderService
db = SessionLocal()
try:
llm_service = LLMProviderService(db)
provider_id = request.params.get("provider_id")
if not provider_id:
llm_settings = normalize_llm_settings(config_manager.get_config().settings.llm)
binding_key = "dashboard_validation" if request.plugin_id == "llm_dashboard_validation" else "documentation"
provider_id = resolve_bound_provider_id(llm_settings, binding_key)
if provider_id:
request.params["provider_id"] = provider_id
if not provider_id:
providers = llm_service.get_all_providers()
active_provider = next((p for p in providers if p.is_active), None)
if active_provider:
provider_id = active_provider.id
request.params["provider_id"] = provider_id
if provider_id:
db_provider = llm_service.get_provider(provider_id)
if not db_provider:
raise ValueError(f"LLM Provider {provider_id} not found")
if request.plugin_id == "llm_dashboard_validation" and not is_multimodal_model(
db_provider.default_model,
db_provider.provider_type,
):
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Selected provider model is not multimodal for dashboard validation",
)
finally:
db.close()
task = await task_manager.create_task(
plugin_id=request.plugin_id,
@@ -113,36 +113,36 @@ async def create_task(
# @PRE: task_manager must be available.
# @POST: Returns a list of tasks.
# @RETURN: List[Task] - List of tasks.
async def list_tasks(
limit: int = 10,
offset: int = 0,
status_filter: Optional[TaskStatus] = Query(None, alias="status"),
task_type: Optional[str] = Query(None, description="Task category: llm_validation, backup, migration"),
plugin_id: Optional[List[str]] = Query(None, description="Filter by plugin_id (repeatable query param)"),
completed_only: bool = Query(False, description="Return only completed tasks (SUCCESS/FAILED)"),
task_manager: TaskManager = Depends(get_task_manager),
_ = Depends(has_permission("tasks", "READ"))
):
"""
Retrieve a list of tasks with pagination and optional status filter.
"""
with belief_scope("list_tasks"):
plugin_filters = list(plugin_id) if plugin_id else []
if task_type:
if task_type not in TASK_TYPE_PLUGIN_MAP:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Unsupported task_type '{task_type}'. Allowed: {', '.join(TASK_TYPE_PLUGIN_MAP.keys())}"
)
plugin_filters.extend(TASK_TYPE_PLUGIN_MAP[task_type])
return task_manager.get_tasks(
limit=limit,
offset=offset,
status=status_filter,
plugin_ids=plugin_filters or None,
completed_only=completed_only
)
async def list_tasks(
limit: int = 10,
offset: int = 0,
status_filter: Optional[TaskStatus] = Query(None, alias="status"),
task_type: Optional[str] = Query(None, description="Task category: llm_validation, backup, migration"),
plugin_id: Optional[List[str]] = Query(None, description="Filter by plugin_id (repeatable query param)"),
completed_only: bool = Query(False, description="Return only completed tasks (SUCCESS/FAILED)"),
task_manager: TaskManager = Depends(get_task_manager),
_ = Depends(has_permission("tasks", "READ"))
):
"""
Retrieve a list of tasks with pagination and optional status filter.
"""
with belief_scope("list_tasks"):
plugin_filters = list(plugin_id) if plugin_id else []
if task_type:
if task_type not in TASK_TYPE_PLUGIN_MAP:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Unsupported task_type '{task_type}'. Allowed: {', '.join(TASK_TYPE_PLUGIN_MAP.keys())}"
)
plugin_filters.extend(TASK_TYPE_PLUGIN_MAP[task_type])
return task_manager.get_tasks(
limit=limit,
offset=offset,
status=status_filter,
plugin_ids=plugin_filters or None,
completed_only=completed_only
)
# [/DEF:list_tasks:Function]
@router.get("/{task_id}", response_model=Task)
@@ -182,6 +182,13 @@ async def get_task(
# @POST: Returns a list of log entries or raises 404.
# @RETURN: List[LogEntry] - List of log entries.
# @TIER: CRITICAL
# @TEST_CONTRACT: TaskLogQueryInput -> List[LogEntry]
# @TEST_SCENARIO: existing_task_logs_filtered -> Returns filtered logs by level/source/search with pagination.
# @TEST_FIXTURE: valid_task_with_mixed_logs -> backend/tests/fixtures/task_logs/valid_task_with_mixed_logs.json
# @TEST_EDGE: missing_task -> Unknown task_id returns 404 Task not found.
# @TEST_EDGE: invalid_level_type -> Non-string/invalid level query rejected by validation or yields empty result.
# @TEST_EDGE: pagination_bounds -> offset=0 and limit=1000 remain within API bounds and do not overflow.
# @TEST_INVARIANT: logs_only_for_existing_task -> VERIFIED_BY: [existing_task_logs_filtered, missing_task]
async def get_task_logs(
task_id: str,
level: Optional[str] = Query(None, description="Filter by log level (DEBUG, INFO, WARNING, ERROR)"),
@@ -328,4 +335,4 @@ async def clear_tasks(
task_manager.clear_tasks(status)
return
# [/DEF:clear_tasks:Function]
# [/DEF:TasksRouter:Module]
# [/DEF:TasksRouter:Module]