таски готовы
This commit is contained in:
83
backend/tests/test_reports_detail_api.py
Normal file
83
backend/tests/test_reports_detail_api.py
Normal file
@@ -0,0 +1,83 @@
|
||||
# [DEF:backend.tests.test_reports_detail_api:Module]
|
||||
# @TIER: CRITICAL
|
||||
# @SEMANTICS: tests, reports, api, detail, diagnostics
|
||||
# @PURPOSE: Contract tests for GET /api/reports/{report_id} detail endpoint behavior.
|
||||
# @LAYER: Domain (Tests)
|
||||
# @RELATION: TESTS -> backend.src.api.routes.reports
|
||||
|
||||
from datetime import datetime, timedelta
|
||||
from types import SimpleNamespace
|
||||
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
from src.app import app
|
||||
from src.core.task_manager.models import Task, TaskStatus
|
||||
from src.dependencies import get_current_user, get_task_manager
|
||||
|
||||
|
||||
class _FakeTaskManager:
|
||||
def __init__(self, tasks):
|
||||
self._tasks = tasks
|
||||
|
||||
def get_all_tasks(self):
|
||||
return self._tasks
|
||||
|
||||
|
||||
def _admin_user():
|
||||
role = SimpleNamespace(name="Admin", permissions=[])
|
||||
return SimpleNamespace(username="test-admin", roles=[role])
|
||||
|
||||
|
||||
def _make_task(task_id: str, plugin_id: str, status: TaskStatus, result=None):
|
||||
now = datetime.utcnow()
|
||||
return Task(
|
||||
id=task_id,
|
||||
plugin_id=plugin_id,
|
||||
status=status,
|
||||
started_at=now - timedelta(minutes=2),
|
||||
finished_at=now - timedelta(minutes=1) if status != TaskStatus.RUNNING else None,
|
||||
params={"environment_id": "env-1"},
|
||||
result=result or {"summary": f"{plugin_id} result"},
|
||||
)
|
||||
|
||||
|
||||
def test_get_report_detail_success():
|
||||
task = _make_task(
|
||||
"detail-1",
|
||||
"superset-migration",
|
||||
TaskStatus.FAILED,
|
||||
result={"error": {"message": "Step failed", "next_actions": ["Check mapping", "Retry"]}},
|
||||
)
|
||||
|
||||
app.dependency_overrides[get_current_user] = lambda: _admin_user()
|
||||
app.dependency_overrides[get_task_manager] = lambda: _FakeTaskManager([task])
|
||||
|
||||
try:
|
||||
client = TestClient(app)
|
||||
response = client.get("/api/reports/detail-1")
|
||||
assert response.status_code == 200
|
||||
|
||||
data = response.json()
|
||||
assert "report" in data
|
||||
assert data["report"]["report_id"] == "detail-1"
|
||||
assert "diagnostics" in data
|
||||
assert "next_actions" in data
|
||||
finally:
|
||||
app.dependency_overrides.clear()
|
||||
|
||||
|
||||
def test_get_report_detail_not_found():
|
||||
task = _make_task("detail-2", "superset-backup", TaskStatus.SUCCESS)
|
||||
|
||||
app.dependency_overrides[get_current_user] = lambda: _admin_user()
|
||||
app.dependency_overrides[get_task_manager] = lambda: _FakeTaskManager([task])
|
||||
|
||||
try:
|
||||
client = TestClient(app)
|
||||
response = client.get("/api/reports/unknown-id")
|
||||
assert response.status_code == 404
|
||||
finally:
|
||||
app.dependency_overrides.clear()
|
||||
|
||||
|
||||
# [/DEF:backend.tests.test_reports_detail_api:Module]
|
||||
Reference in New Issue
Block a user