{
    "file": "backend/src/api/routes/__tests__/test_dashboards.py",
    "verdict": "APPROVED",
    "rejection_reason": "NONE",
    "audit_details": {
      "target_invoked": true,
      "pre_conditions_tested": true,
      "post_conditions_tested": true,
      "test_data_used": true
    },
    "feedback": "All 9 previous findings remediated. @TEST_FIXTURE data aligned, all @TEST_EDGE scenarios covered, all @PRE negative tests present, all @SIDE_EFFECT assertions added. Full contract compliance."
  },
  {
    "file": "backend/src/api/routes/__tests__/test_datasets.py",
    "verdict": "APPROVED",
    "rejection_reason": "NONE",
    "audit_details": {
      "target_invoked": true,
      "pre_conditions_tested": true,
      "post_conditions_tested": true,
      "test_data_used": true
    },
    "feedback": "All 6 previous findings remediated. Full @PRE boundary coverage including page_size>100, empty IDs, missing env. @SIDE_EFFECT assertions added. 503 error path tested."
  },
  {
    "file": "backend/src/core/auth/__tests__/test_auth.py",
    "verdict": "APPROVED",
    "rejection_reason": "NONE",
    "audit_details": {
      "target_invoked": true,
      "pre_conditions_tested": true,
      "post_conditions_tested": true,
      "test_data_used": true
    },
    "feedback": "All 4 previous findings remediated. @SIDE_EFFECT last_login verified. Inactive user @PRE negative test added. Empty hash edge case covered. provision_adfs_user tested for both new and existing user paths."
  },
  {
    "file": "backend/src/services/__tests__/test_resource_service.py",
    "verdict": "APPROVED",
    "rejection_reason": "NONE",
    "audit_details": {
      "target_invoked": true,
      "pre_conditions_tested": true,
      "post_conditions_tested": true,
      "test_data_used": true
    },
    "feedback": "Both prior recommendations implemented. Full edge case coverage for _get_last_task_for_resource. No anti-patterns detected."
  },
  {
    "file": "backend/tests/test_resource_hubs.py",
    "verdict": "APPROVED",
    "rejection_reason": "NONE",
    "audit_details": {
      "target_invoked": true,
      "pre_conditions_tested": true,
      "post_conditions_tested": true,
      "test_data_used": true
    },
    "feedback": "Pagination boundary tests added. All @TEST_EDGE scenarios now covered. No anti-patterns detected."
  },
  {
    "file": "frontend/src/lib/components/assistant/__tests__/assistant_chat.integration.test.js",
    "verdict": "APPROVED",
    "rejection_reason": "NONE",
    "audit_details": {
      "target_invoked": true,
      "pre_conditions_tested": true,
      "post_conditions_tested": true,
      "test_data_used": true
    },
    "feedback": "No changes since previous audit. Contract scanning remains sound."
  },
  {
    "file": "frontend/src/lib/components/assistant/__tests__/assistant_confirmation.integration.test.js",
    "verdict": "APPROVED",
    "rejection_reason": "NONE",
    "audit_details": {
      "target_invoked": true,
      "pre_conditions_tested": true,
      "post_conditions_tested": true,
      "test_data_used": true
    },
    "feedback": "No changes since previous audit. Confirmation flow testing remains sound."
  }
]
This commit is contained in:
2026-02-27 09:59:57 +03:00
parent 36173c0880
commit 4c601fbe06
13 changed files with 92285 additions and 56290 deletions

View File

@@ -10,6 +10,41 @@ from datetime import datetime, timezone
from fastapi.testclient import TestClient
from src.app import app
from src.api.routes.dashboards import DashboardsResponse
from src.dependencies import get_current_user, has_permission, get_config_manager, get_task_manager, get_resource_service, get_mapping_service
# Global mock user for get_current_user dependency overrides
mock_user = MagicMock()
mock_user.username = "testuser"
mock_user.roles = []
admin_role = MagicMock()
admin_role.name = "Admin"
mock_user.roles.append(admin_role)
@pytest.fixture(autouse=True)
def mock_deps():
config_manager = MagicMock()
task_manager = MagicMock()
resource_service = MagicMock()
mapping_service = MagicMock()
app.dependency_overrides[get_config_manager] = lambda: config_manager
app.dependency_overrides[get_task_manager] = lambda: task_manager
app.dependency_overrides[get_resource_service] = lambda: resource_service
app.dependency_overrides[get_mapping_service] = lambda: mapping_service
app.dependency_overrides[get_current_user] = lambda: mock_user
app.dependency_overrides[has_permission("plugin:migration", "READ")] = lambda: mock_user
app.dependency_overrides[has_permission("plugin:migration", "EXECUTE")] = lambda: mock_user
app.dependency_overrides[has_permission("plugin:backup", "EXECUTE")] = lambda: mock_user
app.dependency_overrides[has_permission("tasks", "READ")] = lambda: mock_user
yield {
"config": config_manager,
"task": task_manager,
"resource": resource_service,
"mapping": mapping_service
}
app.dependency_overrides.clear()
client = TestClient(app)
@@ -18,45 +53,35 @@ client = TestClient(app)
# @TEST: GET /api/dashboards returns 200 and valid schema
# @PRE: env_id exists
# @POST: Response matches DashboardsResponse schema
def test_get_dashboards_success():
with patch("src.api.routes.dashboards.get_config_manager") as mock_config, \
patch("src.api.routes.dashboards.get_resource_service") as mock_service, \
patch("src.api.routes.dashboards.get_task_manager") as mock_task_mgr, \
patch("src.api.routes.dashboards.has_permission") as mock_perm:
# Mock environment
mock_env = MagicMock()
mock_env.id = "prod"
mock_config.return_value.get_environments.return_value = [mock_env]
# Mock task manager
mock_task_mgr.return_value.get_all_tasks.return_value = []
# Mock resource service response
async def mock_get_dashboards(env, tasks):
return [
{
"id": 1,
"title": "Sales Report",
"slug": "sales",
"git_status": {"branch": "main", "sync_status": "OK"},
"last_task": {"task_id": "task-1", "status": "SUCCESS"}
}
]
mock_service.return_value.get_dashboards_with_status = AsyncMock(
side_effect=mock_get_dashboards
)
# Mock permission
mock_perm.return_value = lambda: True
def test_get_dashboards_success(mock_deps):
"""Uses @TEST_FIXTURE: dashboard_list_happy data."""
mock_env = MagicMock()
mock_env.id = "prod"
mock_deps["config"].get_environments.return_value = [mock_env]
mock_deps["task"].get_all_tasks.return_value = []
response = client.get("/api/dashboards?env_id=prod")
assert response.status_code == 200
data = response.json()
assert "dashboards" in data
assert "total" in data
assert "page" in data
# @TEST_FIXTURE: dashboard_list_happy -> {"id": 1, "title": "Main Revenue"}
mock_deps["resource"].get_dashboards_with_status = AsyncMock(return_value=[
{
"id": 1,
"title": "Main Revenue",
"slug": "main-revenue",
"git_status": {"branch": "main", "sync_status": "OK"},
"last_task": {"task_id": "task-1", "status": "SUCCESS"}
}
])
response = client.get("/api/dashboards?env_id=prod")
assert response.status_code == 200
data = response.json()
# exhaustive @POST assertions
assert "dashboards" in data
assert len(data["dashboards"]) == 1
assert data["dashboards"][0]["title"] == "Main Revenue"
assert data["total"] == 1
assert "page" in data
DashboardsResponse(**data)
# [/DEF:test_get_dashboards_success:Function]
@@ -66,55 +91,81 @@ def test_get_dashboards_success():
# @TEST: GET /api/dashboards filters by search term
# @PRE: search parameter provided
# @POST: Only matching dashboards returned
def test_get_dashboards_with_search():
with patch("src.api.routes.dashboards.get_config_manager") as mock_config, \
patch("src.api.routes.dashboards.get_resource_service") as mock_service, \
patch("src.api.routes.dashboards.get_task_manager") as mock_task_mgr, \
patch("src.api.routes.dashboards.has_permission") as mock_perm:
# Mock environment
mock_env = MagicMock()
mock_env.id = "prod"
mock_config.return_value.get_environments.return_value = [mock_env]
mock_task_mgr.return_value.get_all_tasks.return_value = []
async def mock_get_dashboards(env, tasks):
return [
{"id": 1, "title": "Sales Report", "slug": "sales"},
{"id": 2, "title": "Marketing Dashboard", "slug": "marketing"}
]
mock_service.return_value.get_dashboards_with_status = AsyncMock(
side_effect=mock_get_dashboards
)
mock_perm.return_value = lambda: True
def test_get_dashboards_with_search(mock_deps):
mock_env = MagicMock()
mock_env.id = "prod"
mock_deps["config"].get_environments.return_value = [mock_env]
mock_deps["task"].get_all_tasks.return_value = []
response = client.get("/api/dashboards?env_id=prod&search=sales")
assert response.status_code == 200
data = response.json()
# Filtered by search term
async def mock_get_dashboards(env, tasks):
return [
{"id": 1, "title": "Sales Report", "slug": "sales"},
{"id": 2, "title": "Marketing Dashboard", "slug": "marketing"}
]
mock_deps["resource"].get_dashboards_with_status = AsyncMock(
side_effect=mock_get_dashboards
)
response = client.get("/api/dashboards?env_id=prod&search=sales")
assert response.status_code == 200
data = response.json()
# @POST: Filtered result count must match search
assert len(data["dashboards"]) == 1
assert data["dashboards"][0]["title"] == "Sales Report"
# [/DEF:test_get_dashboards_with_search:Function]
# [DEF:test_get_dashboards_empty:Function]
# @TEST_EDGE: empty_dashboards -> {env_id: 'empty_env', expected_total: 0}
def test_get_dashboards_empty(mock_deps):
"""@TEST_EDGE: empty_dashboards -> {env_id: 'empty_env', expected_total: 0}"""
mock_env = MagicMock()
mock_env.id = "empty_env"
mock_deps["config"].get_environments.return_value = [mock_env]
mock_deps["task"].get_all_tasks.return_value = []
mock_deps["resource"].get_dashboards_with_status = AsyncMock(return_value=[])
response = client.get("/api/dashboards?env_id=empty_env")
assert response.status_code == 200
data = response.json()
assert data["total"] == 0
assert len(data["dashboards"]) == 0
assert data["total_pages"] == 1
DashboardsResponse(**data)
# [/DEF:test_get_dashboards_empty:Function]
# [DEF:test_get_dashboards_superset_failure:Function]
# @TEST_EDGE: external_superset_failure -> {env_id: 'bad_conn', status: 503}
def test_get_dashboards_superset_failure(mock_deps):
"""@TEST_EDGE: external_superset_failure -> {env_id: 'bad_conn', status: 503}"""
mock_env = MagicMock()
mock_env.id = "bad_conn"
mock_deps["config"].get_environments.return_value = [mock_env]
mock_deps["task"].get_all_tasks.return_value = []
mock_deps["resource"].get_dashboards_with_status = AsyncMock(
side_effect=Exception("Connection refused")
)
response = client.get("/api/dashboards?env_id=bad_conn")
assert response.status_code == 503
assert "Failed to fetch dashboards" in response.json()["detail"]
# [/DEF:test_get_dashboards_superset_failure:Function]
# [DEF:test_get_dashboards_env_not_found:Function]
# @TEST: GET /api/dashboards returns 404 if env_id missing
# @PRE: env_id does not exist
# @POST: Returns 404 error
def test_get_dashboards_env_not_found():
with patch("src.api.routes.dashboards.get_config_manager") as mock_config, \
patch("src.api.routes.dashboards.has_permission") as mock_perm:
mock_config.return_value.get_environments.return_value = []
mock_perm.return_value = lambda: True
response = client.get("/api/dashboards?env_id=nonexistent")
assert response.status_code == 404
assert "Environment not found" in response.json()["detail"]
def test_get_dashboards_env_not_found(mock_deps):
mock_deps["config"].get_environments.return_value = []
response = client.get("/api/dashboards?env_id=nonexistent")
assert response.status_code == 404
assert "Environment not found" in response.json()["detail"]
# [/DEF:test_get_dashboards_env_not_found:Function]
@@ -124,40 +175,29 @@ def test_get_dashboards_env_not_found():
# @TEST: GET /api/dashboards returns 400 for invalid page/page_size
# @PRE: page < 1 or page_size > 100
# @POST: Returns 400 error
def test_get_dashboards_invalid_pagination():
with patch("src.api.routes.dashboards.get_config_manager") as mock_config, \
patch("src.api.routes.dashboards.has_permission") as mock_perm:
def test_get_dashboards_invalid_pagination(mock_deps):
mock_env = MagicMock()
mock_env.id = "prod"
mock_deps["config"].get_environments.return_value = [mock_env]
# Invalid page
response = client.get("/api/dashboards?env_id=prod&page=0")
assert response.status_code == 400
assert "Page must be >= 1" in response.json()["detail"]
mock_env = MagicMock()
mock_env.id = "prod"
mock_config.return_value.get_environments.return_value = [mock_env]
mock_perm.return_value = lambda: True
# Invalid page
response = client.get("/api/dashboards?env_id=prod&page=0")
assert response.status_code == 400
assert "Page must be >= 1" in response.json()["detail"]
# Invalid page_size
response = client.get("/api/dashboards?env_id=prod&page_size=101")
assert response.status_code == 400
assert "Page size must be between 1 and 100" in response.json()["detail"]
# Invalid page_size
response = client.get("/api/dashboards?env_id=prod&page_size=101")
assert response.status_code == 400
assert "Page size must be between 1 and 100" in response.json()["detail"]
# [/DEF:test_get_dashboards_invalid_pagination:Function]
# [DEF:test_get_dashboard_detail_success:Function]
# @TEST: GET /api/dashboards/{id} returns dashboard detail with charts and datasets
def test_get_dashboard_detail_success():
with patch("src.api.routes.dashboards.get_config_manager") as mock_config, \
patch("src.api.routes.dashboards.has_permission") as mock_perm, \
patch("src.api.routes.dashboards.SupersetClient") as mock_client_cls:
def test_get_dashboard_detail_success(mock_deps):
with patch("src.api.routes.dashboards.SupersetClient") as mock_client_cls:
mock_env = MagicMock()
mock_env.id = "prod"
mock_config.return_value.get_environments.return_value = [mock_env]
mock_perm.return_value = lambda: True
mock_deps["config"].get_environments.return_value = [mock_env]
mock_client = MagicMock()
mock_client.get_dashboard_detail.return_value = {
@@ -205,56 +245,46 @@ def test_get_dashboard_detail_success():
# [DEF:test_get_dashboard_detail_env_not_found:Function]
# @TEST: GET /api/dashboards/{id} returns 404 for missing environment
def test_get_dashboard_detail_env_not_found():
with patch("src.api.routes.dashboards.get_config_manager") as mock_config, \
patch("src.api.routes.dashboards.has_permission") as mock_perm:
mock_config.return_value.get_environments.return_value = []
mock_perm.return_value = lambda: True
def test_get_dashboard_detail_env_not_found(mock_deps):
mock_deps["config"].get_environments.return_value = []
response = client.get("/api/dashboards/42?env_id=missing")
response = client.get("/api/dashboards/42?env_id=missing")
assert response.status_code == 404
assert "Environment not found" in response.json()["detail"]
assert response.status_code == 404
assert "Environment not found" in response.json()["detail"]
# [/DEF:test_get_dashboard_detail_env_not_found:Function]
# [DEF:test_migrate_dashboards_success:Function]
# @TEST: POST /api/dashboards/migrate creates migration task
# @PRE: Valid source_env_id, target_env_id, dashboard_ids
# @POST: Returns task_id
def test_migrate_dashboards_success():
with patch("src.api.routes.dashboards.get_config_manager") as mock_config, \
patch("src.api.routes.dashboards.get_task_manager") as mock_task_mgr, \
patch("src.api.routes.dashboards.has_permission") as mock_perm:
# Mock environments
mock_source = MagicMock()
mock_source.id = "source"
mock_target = MagicMock()
mock_target.id = "target"
mock_config.return_value.get_environments.return_value = [mock_source, mock_target]
# Mock task manager
mock_task = MagicMock()
mock_task.id = "task-migrate-123"
mock_task_mgr.return_value.create_task = AsyncMock(return_value=mock_task)
# Mock permission
mock_perm.return_value = lambda: True
# @POST: Returns task_id and create_task was called
def test_migrate_dashboards_success(mock_deps):
mock_source = MagicMock()
mock_source.id = "source"
mock_target = MagicMock()
mock_target.id = "target"
mock_deps["config"].get_environments.return_value = [mock_source, mock_target]
response = client.post(
"/api/dashboards/migrate",
json={
"source_env_id": "source",
"target_env_id": "target",
"dashboard_ids": [1, 2, 3],
"db_mappings": {"old_db": "new_db"}
}
)
assert response.status_code == 200
data = response.json()
assert "task_id" in data
mock_task = MagicMock()
mock_task.id = "task-migrate-123"
mock_deps["task"].create_task = AsyncMock(return_value=mock_task)
response = client.post(
"/api/dashboards/migrate",
json={
"source_env_id": "source",
"target_env_id": "target",
"dashboard_ids": [1, 2, 3],
"db_mappings": {"old_db": "new_db"}
}
)
assert response.status_code == 200
data = response.json()
assert "task_id" in data
# @POST/@SIDE_EFFECT: create_task was called
mock_deps["task"].create_task.assert_called_once()
# [/DEF:test_migrate_dashboards_success:Function]
@@ -264,154 +294,184 @@ def test_migrate_dashboards_success():
# @TEST: POST /api/dashboards/migrate returns 400 for empty dashboard_ids
# @PRE: dashboard_ids is empty
# @POST: Returns 400 error
def test_migrate_dashboards_no_ids():
with patch("src.api.routes.dashboards.has_permission") as mock_perm:
mock_perm.return_value = lambda: True
def test_migrate_dashboards_no_ids(mock_deps):
response = client.post(
"/api/dashboards/migrate",
json={
"source_env_id": "source",
"target_env_id": "target",
"dashboard_ids": []
}
)
response = client.post(
"/api/dashboards/migrate",
json={
"source_env_id": "source",
"target_env_id": "target",
"dashboard_ids": []
}
)
assert response.status_code == 400
assert "At least one dashboard ID must be provided" in response.json()["detail"]
assert response.status_code == 400
assert "At least one dashboard ID must be provided" in response.json()["detail"]
# [/DEF:test_migrate_dashboards_no_ids:Function]
# [DEF:test_migrate_dashboards_env_not_found:Function]
# @PRE: source_env_id and target_env_id are valid environment IDs
def test_migrate_dashboards_env_not_found(mock_deps):
"""@PRE: source_env_id and target_env_id are valid environment IDs."""
mock_deps["config"].get_environments.return_value = []
response = client.post(
"/api/dashboards/migrate",
json={
"source_env_id": "ghost",
"target_env_id": "t",
"dashboard_ids": [1]
}
)
assert response.status_code == 404
assert "Source environment not found" in response.json()["detail"]
# [/DEF:test_migrate_dashboards_env_not_found:Function]
# [DEF:test_backup_dashboards_success:Function]
# @TEST: POST /api/dashboards/backup creates backup task
# @PRE: Valid env_id, dashboard_ids
# @POST: Returns task_id
def test_backup_dashboards_success():
with patch("src.api.routes.dashboards.get_config_manager") as mock_config, \
patch("src.api.routes.dashboards.get_task_manager") as mock_task_mgr, \
patch("src.api.routes.dashboards.has_permission") as mock_perm:
# Mock environment
mock_env = MagicMock()
mock_env.id = "prod"
mock_config.return_value.get_environments.return_value = [mock_env]
# Mock task manager
mock_task = MagicMock()
mock_task.id = "task-backup-456"
mock_task_mgr.return_value.create_task = AsyncMock(return_value=mock_task)
# Mock permission
mock_perm.return_value = lambda: True
# @POST: Returns task_id and create_task was called
def test_backup_dashboards_success(mock_deps):
mock_env = MagicMock()
mock_env.id = "prod"
mock_deps["config"].get_environments.return_value = [mock_env]
response = client.post(
"/api/dashboards/backup",
json={
"env_id": "prod",
"dashboard_ids": [1, 2, 3],
"schedule": "0 0 * * *"
}
)
assert response.status_code == 200
data = response.json()
assert "task_id" in data
mock_task = MagicMock()
mock_task.id = "task-backup-456"
mock_deps["task"].create_task = AsyncMock(return_value=mock_task)
response = client.post(
"/api/dashboards/backup",
json={
"env_id": "prod",
"dashboard_ids": [1, 2, 3],
"schedule": "0 0 * * *"
}
)
assert response.status_code == 200
data = response.json()
assert "task_id" in data
# @POST/@SIDE_EFFECT: create_task was called
mock_deps["task"].create_task.assert_called_once()
# [/DEF:test_backup_dashboards_success:Function]
# [DEF:test_backup_dashboards_env_not_found:Function]
# @PRE: env_id is a valid environment ID
def test_backup_dashboards_env_not_found(mock_deps):
"""@PRE: env_id is a valid environment ID."""
mock_deps["config"].get_environments.return_value = []
response = client.post(
"/api/dashboards/backup",
json={
"env_id": "ghost",
"dashboard_ids": [1]
}
)
assert response.status_code == 404
assert "Environment not found" in response.json()["detail"]
# [/DEF:test_backup_dashboards_env_not_found:Function]
# [DEF:test_get_database_mappings_success:Function]
# @TEST: GET /api/dashboards/db-mappings returns mapping suggestions
# @PRE: Valid source_env_id, target_env_id
# @POST: Returns list of database mappings
def test_get_database_mappings_success():
with patch("src.api.routes.dashboards.get_mapping_service") as mock_service, \
patch("src.api.routes.dashboards.has_permission") as mock_perm:
# Mock mapping service
mock_service.return_value.get_suggestions = AsyncMock(return_value=[
{
"source_db": "old_sales",
"target_db": "new_sales",
"source_db_uuid": "uuid-1",
"target_db_uuid": "uuid-2",
"confidence": 0.95
}
])
# Mock permission
mock_perm.return_value = lambda: True
def test_get_database_mappings_success(mock_deps):
mock_source = MagicMock()
mock_source.id = "prod"
mock_target = MagicMock()
mock_target.id = "staging"
mock_deps["config"].get_environments.return_value = [mock_source, mock_target]
response = client.get("/api/dashboards/db-mappings?source_env_id=prod&target_env_id=staging")
assert response.status_code == 200
data = response.json()
assert "mappings" in data
mock_deps["mapping"].get_suggestions = AsyncMock(return_value=[
{
"source_db": "old_sales",
"target_db": "new_sales",
"source_db_uuid": "uuid-1",
"target_db_uuid": "uuid-2",
"confidence": 0.95
}
])
response = client.get("/api/dashboards/db-mappings?source_env_id=prod&target_env_id=staging")
assert response.status_code == 200
data = response.json()
assert "mappings" in data
assert len(data["mappings"]) == 1
assert data["mappings"][0]["confidence"] == 0.95
# [/DEF:test_get_database_mappings_success:Function]
# [DEF:test_get_database_mappings_env_not_found:Function]
# @PRE: source_env_id and target_env_id are valid environment IDs
def test_get_database_mappings_env_not_found(mock_deps):
"""@PRE: source_env_id must be a valid environment."""
mock_deps["config"].get_environments.return_value = []
response = client.get("/api/dashboards/db-mappings?source_env_id=ghost&target_env_id=t")
assert response.status_code == 404
# [/DEF:test_get_database_mappings_env_not_found:Function]
# [DEF:test_get_dashboard_tasks_history_filters_success:Function]
# @TEST: GET /api/dashboards/{id}/tasks returns backup and llm tasks for dashboard
def test_get_dashboard_tasks_history_filters_success():
with patch("src.api.routes.dashboards.get_task_manager") as mock_task_mgr, \
patch("src.api.routes.dashboards.has_permission") as mock_perm:
now = datetime.now(timezone.utc)
def test_get_dashboard_tasks_history_filters_success(mock_deps):
now = datetime.now(timezone.utc)
llm_task = MagicMock()
llm_task.id = "task-llm-1"
llm_task.plugin_id = "llm_dashboard_validation"
llm_task.status = "SUCCESS"
llm_task.started_at = now
llm_task.finished_at = now
llm_task.params = {"dashboard_id": "42", "environment_id": "prod"}
llm_task.result = {"summary": "LLM validation complete"}
llm_task = MagicMock()
llm_task.id = "task-llm-1"
llm_task.plugin_id = "llm_dashboard_validation"
llm_task.status = "SUCCESS"
llm_task.started_at = now
llm_task.finished_at = now
llm_task.params = {"dashboard_id": "42", "environment_id": "prod"}
llm_task.result = {"summary": "LLM validation complete"}
backup_task = MagicMock()
backup_task.id = "task-backup-1"
backup_task.plugin_id = "superset-backup"
backup_task.status = "RUNNING"
backup_task.started_at = now
backup_task.finished_at = None
backup_task.params = {"env": "prod", "dashboards": [42]}
backup_task.result = {}
backup_task = MagicMock()
backup_task.id = "task-backup-1"
backup_task.plugin_id = "superset-backup"
backup_task.status = "RUNNING"
backup_task.started_at = now
backup_task.finished_at = None
backup_task.params = {"env": "prod", "dashboards": [42]}
backup_task.result = {}
other_task = MagicMock()
other_task.id = "task-other"
other_task.plugin_id = "superset-backup"
other_task.status = "SUCCESS"
other_task.started_at = now
other_task.finished_at = now
other_task.params = {"env": "prod", "dashboards": [777]}
other_task.result = {}
other_task = MagicMock()
other_task.id = "task-other"
other_task.plugin_id = "superset-backup"
other_task.status = "SUCCESS"
other_task.started_at = now
other_task.finished_at = now
other_task.params = {"env": "prod", "dashboards": [777]}
other_task.result = {}
mock_task_mgr.return_value.get_all_tasks.return_value = [other_task, llm_task, backup_task]
mock_perm.return_value = lambda: True
mock_deps["task"].get_all_tasks.return_value = [other_task, llm_task, backup_task]
response = client.get("/api/dashboards/42/tasks?env_id=prod&limit=10")
response = client.get("/api/dashboards/42/tasks?env_id=prod&limit=10")
assert response.status_code == 200
data = response.json()
assert data["dashboard_id"] == 42
assert len(data["items"]) == 2
assert {item["plugin_id"] for item in data["items"]} == {"llm_dashboard_validation", "superset-backup"}
assert response.status_code == 200
data = response.json()
assert data["dashboard_id"] == 42
assert len(data["items"]) == 2
assert {item["plugin_id"] for item in data["items"]} == {"llm_dashboard_validation", "superset-backup"}
# [/DEF:test_get_dashboard_tasks_history_filters_success:Function]
# [DEF:test_get_dashboard_thumbnail_success:Function]
# @TEST: GET /api/dashboards/{id}/thumbnail proxies image bytes from Superset
def test_get_dashboard_thumbnail_success():
with patch("src.api.routes.dashboards.get_config_manager") as mock_config, \
patch("src.api.routes.dashboards.has_permission") as mock_perm, \
patch("src.api.routes.dashboards.SupersetClient") as mock_client_cls:
def test_get_dashboard_thumbnail_success(mock_deps):
with patch("src.api.routes.dashboards.SupersetClient") as mock_client_cls:
mock_env = MagicMock()
mock_env.id = "prod"
mock_config.return_value.get_environments.return_value = [mock_env]
mock_perm.return_value = lambda: True
mock_deps["config"].get_environments.return_value = [mock_env]
mock_client = MagicMock()
mock_response = MagicMock()

View File

@@ -11,6 +11,41 @@ from unittest.mock import MagicMock, patch, AsyncMock
from fastapi.testclient import TestClient
from src.app import app
from src.api.routes.datasets import DatasetsResponse, DatasetDetailResponse
from src.dependencies import get_current_user, has_permission, get_config_manager, get_task_manager, get_resource_service, get_mapping_service
# Global mock user for get_current_user dependency overrides
mock_user = MagicMock()
mock_user.username = "testuser"
mock_user.roles = []
admin_role = MagicMock()
admin_role.name = "Admin"
mock_user.roles.append(admin_role)
@pytest.fixture(autouse=True)
def mock_deps():
config_manager = MagicMock()
task_manager = MagicMock()
resource_service = MagicMock()
mapping_service = MagicMock()
app.dependency_overrides[get_config_manager] = lambda: config_manager
app.dependency_overrides[get_task_manager] = lambda: task_manager
app.dependency_overrides[get_resource_service] = lambda: resource_service
app.dependency_overrides[get_mapping_service] = lambda: mapping_service
app.dependency_overrides[get_current_user] = lambda: mock_user
app.dependency_overrides[has_permission("plugin:migration", "READ")] = lambda: mock_user
app.dependency_overrides[has_permission("plugin:migration", "EXECUTE")] = lambda: mock_user
app.dependency_overrides[has_permission("plugin:backup", "EXECUTE")] = lambda: mock_user
app.dependency_overrides[has_permission("tasks", "READ")] = lambda: mock_user
yield {
"config": config_manager,
"task": task_manager,
"resource": resource_service,
"mapping": mapping_service
}
app.dependency_overrides.clear()
client = TestClient(app)
@@ -20,41 +55,34 @@ client = TestClient(app)
# @TEST: GET /api/datasets returns 200 and valid schema
# @PRE: env_id exists
# @POST: Response matches DatasetsResponse schema
def test_get_datasets_success():
with patch("src.api.routes.datasets.get_config_manager") as mock_config, \
patch("src.api.routes.datasets.get_resource_service") as mock_service, \
patch("src.api.routes.datasets.has_permission") as mock_perm:
# Mock environment
mock_env = MagicMock()
mock_env.id = "prod"
mock_config.return_value.get_environments.return_value = [mock_env]
# Mock resource service response
mock_service.return_value.get_datasets_with_status.return_value = AsyncMock()(
return_value=[
{
"id": 1,
"table_name": "sales_data",
"schema": "public",
"database": "sales_db",
"mapped_fields": {"total": 10, "mapped": 5},
"last_task": {"task_id": "task-1", "status": "SUCCESS"}
}
]
)
# Mock permission
mock_perm.return_value = lambda: True
def test_get_datasets_success(mock_deps):
# Mock environment
mock_env = MagicMock()
mock_env.id = "prod"
mock_deps["config"].get_environments.return_value = [mock_env]
# Mock resource service response
mock_deps["resource"].get_datasets_with_status = AsyncMock(
return_value=[
{
"id": 1,
"table_name": "sales_data",
"schema": "public",
"database": "sales_db",
"mapped_fields": {"total": 10, "mapped": 5},
"last_task": {"task_id": "task-1", "status": "SUCCESS"}
}
]
)
response = client.get("/api/datasets?env_id=prod")
assert response.status_code == 200
data = response.json()
assert "datasets" in data
assert len(data["datasets"]) >= 0
# Validate against Pydantic model
DatasetsResponse(**data)
response = client.get("/api/datasets?env_id=prod")
assert response.status_code == 200
data = response.json()
assert "datasets" in data
assert len(data["datasets"]) >= 0
# Validate against Pydantic model
DatasetsResponse(**data)
# [/DEF:test_get_datasets_success:Function]
@@ -64,17 +92,13 @@ def test_get_datasets_success():
# @TEST: GET /api/datasets returns 404 if env_id missing
# @PRE: env_id does not exist
# @POST: Returns 404 error
def test_get_datasets_env_not_found():
with patch("src.api.routes.datasets.get_config_manager") as mock_config, \
patch("src.api.routes.datasets.has_permission") as mock_perm:
mock_config.return_value.get_environments.return_value = []
mock_perm.return_value = lambda: True
def test_get_datasets_env_not_found(mock_deps):
mock_deps["config"].get_environments.return_value = []
response = client.get("/api/datasets?env_id=nonexistent")
assert response.status_code == 404
assert "Environment not found" in response.json()["detail"]
response = client.get("/api/datasets?env_id=nonexistent")
assert response.status_code == 404
assert "Environment not found" in response.json()["detail"]
# [/DEF:test_get_datasets_env_not_found:Function]
@@ -84,24 +108,25 @@ def test_get_datasets_env_not_found():
# @TEST: GET /api/datasets returns 400 for invalid page/page_size
# @PRE: page < 1 or page_size > 100
# @POST: Returns 400 error
def test_get_datasets_invalid_pagination():
with patch("src.api.routes.datasets.get_config_manager") as mock_config, \
patch("src.api.routes.datasets.has_permission") as mock_perm:
mock_env = MagicMock()
mock_env.id = "prod"
mock_config.return_value.get_environments.return_value = [mock_env]
mock_perm.return_value = lambda: True
def test_get_datasets_invalid_pagination(mock_deps):
mock_env = MagicMock()
mock_env.id = "prod"
mock_deps["config"].get_environments.return_value = [mock_env]
# Invalid page
response = client.get("/api/datasets?env_id=prod&page=0")
assert response.status_code == 400
assert "Page must be >= 1" in response.json()["detail"]
# Invalid page_size
response = client.get("/api/datasets?env_id=prod&page_size=0")
assert response.status_code == 400
assert "Page size must be between 1 and 100" in response.json()["detail"]
# Invalid page
response = client.get("/api/datasets?env_id=prod&page=0")
assert response.status_code == 400
assert "Page must be >= 1" in response.json()["detail"]
# Invalid page_size (too small)
response = client.get("/api/datasets?env_id=prod&page_size=0")
assert response.status_code == 400
assert "Page size must be between 1 and 100" in response.json()["detail"]
# @TEST_EDGE: page_size > 100 exceeds max
response = client.get("/api/datasets?env_id=prod&page_size=101")
assert response.status_code == 400
assert "Page size must be between 1 and 100" in response.json()["detail"]
# [/DEF:test_get_datasets_invalid_pagination:Function]
@@ -111,36 +136,31 @@ def test_get_datasets_invalid_pagination():
# @TEST: POST /api/datasets/map-columns creates mapping task
# @PRE: Valid env_id, dataset_ids, source_type
# @POST: Returns task_id
def test_map_columns_success():
with patch("src.api.routes.datasets.get_config_manager") as mock_config, \
patch("src.api.routes.datasets.get_task_manager") as mock_task_mgr, \
patch("src.api.routes.datasets.has_permission") as mock_perm:
# Mock environment
mock_env = MagicMock()
mock_env.id = "prod"
mock_config.return_value.get_environments.return_value = [mock_env]
# Mock task manager
mock_task = MagicMock()
mock_task.id = "task-123"
mock_task_mgr.return_value.create_task = AsyncMock(return_value=mock_task)
# Mock permission
mock_perm.return_value = lambda: True
def test_map_columns_success(mock_deps):
# Mock environment
mock_env = MagicMock()
mock_env.id = "prod"
mock_deps["config"].get_environments.return_value = [mock_env]
# Mock task manager
mock_task = MagicMock()
mock_task.id = "task-123"
mock_deps["task"].create_task = AsyncMock(return_value=mock_task)
response = client.post(
"/api/datasets/map-columns",
json={
"env_id": "prod",
"dataset_ids": [1, 2, 3],
"source_type": "postgresql"
}
)
assert response.status_code == 200
data = response.json()
assert "task_id" in data
response = client.post(
"/api/datasets/map-columns",
json={
"env_id": "prod",
"dataset_ids": [1, 2, 3],
"source_type": "postgresql"
}
)
assert response.status_code == 200
data = response.json()
assert "task_id" in data
# @POST/@SIDE_EFFECT: create_task was called
mock_deps["task"].create_task.assert_called_once()
# [/DEF:test_map_columns_success:Function]
@@ -150,21 +170,18 @@ def test_map_columns_success():
# @TEST: POST /api/datasets/map-columns returns 400 for invalid source_type
# @PRE: source_type is not 'postgresql' or 'xlsx'
# @POST: Returns 400 error
def test_map_columns_invalid_source_type():
with patch("src.api.routes.datasets.has_permission") as mock_perm:
mock_perm.return_value = lambda: True
response = client.post(
"/api/datasets/map-columns",
json={
"env_id": "prod",
"dataset_ids": [1],
"source_type": "invalid"
}
)
assert response.status_code == 400
assert "Source type must be 'postgresql' or 'xlsx'" in response.json()["detail"]
def test_map_columns_invalid_source_type(mock_deps):
response = client.post(
"/api/datasets/map-columns",
json={
"env_id": "prod",
"dataset_ids": [1],
"source_type": "invalid"
}
)
assert response.status_code == 400
assert "Source type must be 'postgresql' or 'xlsx'" in response.json()["detail"]
# [/DEF:test_map_columns_invalid_source_type:Function]
@@ -174,39 +191,110 @@ def test_map_columns_invalid_source_type():
# @TEST: POST /api/datasets/generate-docs creates doc generation task
# @PRE: Valid env_id, dataset_ids, llm_provider
# @POST: Returns task_id
def test_generate_docs_success():
with patch("src.api.routes.datasets.get_config_manager") as mock_config, \
patch("src.api.routes.datasets.get_task_manager") as mock_task_mgr, \
patch("src.api.routes.datasets.has_permission") as mock_perm:
# Mock environment
mock_env = MagicMock()
mock_env.id = "prod"
mock_config.return_value.get_environments.return_value = [mock_env]
# Mock task manager
mock_task = MagicMock()
mock_task.id = "task-456"
mock_task_mgr.return_value.create_task = AsyncMock(return_value=mock_task)
# Mock permission
mock_perm.return_value = lambda: True
def test_generate_docs_success(mock_deps):
# Mock environment
mock_env = MagicMock()
mock_env.id = "prod"
mock_deps["config"].get_environments.return_value = [mock_env]
# Mock task manager
mock_task = MagicMock()
mock_task.id = "task-456"
mock_deps["task"].create_task = AsyncMock(return_value=mock_task)
response = client.post(
"/api/datasets/generate-docs",
json={
"env_id": "prod",
"dataset_ids": [1],
"llm_provider": "openai"
}
)
assert response.status_code == 200
data = response.json()
assert "task_id" in data
response = client.post(
"/api/datasets/generate-docs",
json={
"env_id": "prod",
"dataset_ids": [1],
"llm_provider": "openai"
}
)
assert response.status_code == 200
data = response.json()
assert "task_id" in data
# @POST/@SIDE_EFFECT: create_task was called
mock_deps["task"].create_task.assert_called_once()
# [/DEF:test_generate_docs_success:Function]
# [DEF:test_map_columns_empty_ids:Function]
# @TEST: POST /api/datasets/map-columns returns 400 for empty dataset_ids
# @PRE: dataset_ids is empty
# @POST: Returns 400 error
def test_map_columns_empty_ids(mock_deps):
"""@PRE: dataset_ids must be non-empty."""
response = client.post(
"/api/datasets/map-columns",
json={
"env_id": "prod",
"dataset_ids": [],
"source_type": "postgresql"
}
)
assert response.status_code == 400
assert "At least one dataset ID must be provided" in response.json()["detail"]
# [/DEF:test_map_columns_empty_ids:Function]
# [DEF:test_generate_docs_empty_ids:Function]
# @TEST: POST /api/datasets/generate-docs returns 400 for empty dataset_ids
# @PRE: dataset_ids is empty
# @POST: Returns 400 error
def test_generate_docs_empty_ids(mock_deps):
"""@PRE: dataset_ids must be non-empty."""
response = client.post(
"/api/datasets/generate-docs",
json={
"env_id": "prod",
"dataset_ids": [],
"llm_provider": "openai"
}
)
assert response.status_code == 400
assert "At least one dataset ID must be provided" in response.json()["detail"]
# [/DEF:test_generate_docs_empty_ids:Function]
# [DEF:test_generate_docs_env_not_found:Function]
# @TEST: POST /api/datasets/generate-docs returns 404 for missing env
# @PRE: env_id does not exist
# @POST: Returns 404 error
def test_generate_docs_env_not_found(mock_deps):
"""@PRE: env_id must be a valid environment."""
mock_deps["config"].get_environments.return_value = []
response = client.post(
"/api/datasets/generate-docs",
json={
"env_id": "ghost",
"dataset_ids": [1],
"llm_provider": "openai"
}
)
assert response.status_code == 404
assert "Environment not found" in response.json()["detail"]
# [/DEF:test_generate_docs_env_not_found:Function]
# [DEF:test_get_datasets_superset_failure:Function]
# @TEST_EDGE: external_superset_failure -> {status: 503}
def test_get_datasets_superset_failure(mock_deps):
"""@TEST_EDGE: external_superset_failure -> {status: 503}"""
mock_env = MagicMock()
mock_env.id = "bad_conn"
mock_deps["config"].get_environments.return_value = [mock_env]
mock_deps["task"].get_all_tasks.return_value = []
mock_deps["resource"].get_datasets_with_status = AsyncMock(
side_effect=Exception("Connection refused")
)
response = client.get("/api/datasets?env_id=bad_conn")
assert response.status_code == 503
assert "Failed to fetch datasets" in response.json()["detail"]
# [/DEF:test_get_datasets_superset_failure:Function]
# [/DEF:backend.src.api.routes.__tests__.test_datasets:Module]

View File

@@ -1,6 +1,6 @@
# [DEF:backend.src.api.routes.dashboards:Module]
#
# @TIER: STANDARD
# @TIER: CRITICAL
# @SEMANTICS: api, dashboards, resources, hub
# @PURPOSE: API endpoints for the Dashboard Hub - listing dashboards with Git and task status
# @LAYER: API
@@ -9,6 +9,27 @@
# @RELATION: DEPENDS_ON -> backend.src.core.superset_client
#
# @INVARIANT: All dashboard responses include git_status and last_task metadata
#
# @TEST_CONTRACT: DashboardsAPI -> {
# required_fields: {env_id: string, page: integer, page_size: integer},
# optional_fields: {search: string},
# invariants: ["Pagination must be valid", "Environment must exist"]
# }
#
# @TEST_FIXTURE: dashboard_list_happy -> {
# "env_id": "prod",
# "expected_count": 1,
# "dashboards": [{"id": 1, "title": "Main Revenue"}]
# }
#
# @TEST_EDGE: pagination_zero_page -> {"env_id": "prod", "page": 0, "status": 400}
# @TEST_EDGE: pagination_oversize -> {"env_id": "prod", "page_size": 101, "status": 400}
# @TEST_EDGE: missing_env -> {"env_id": "ghost", "status": 404}
# @TEST_EDGE: empty_dashboards -> {"env_id": "empty_env", "expected_total": 0}
# @TEST_EDGE: external_superset_failure -> {"env_id": "bad_conn", "status": 503}
#
# @TEST_INVARIANT: metadata_consistency -> verifies: [dashboard_list_happy, empty_dashboards]
#
# [SECTION: IMPORTS]
from fastapi import APIRouter, Depends, HTTPException, Query, Response
@@ -219,10 +240,23 @@ async def get_dashboards(
async def get_database_mappings(
source_env_id: str,
target_env_id: str,
config_manager=Depends(get_config_manager),
mapping_service=Depends(get_mapping_service),
_ = Depends(has_permission("plugin:migration", "READ"))
):
with belief_scope("get_database_mappings", f"source={source_env_id}, target={target_env_id}"):
# Validate environments exist
environments = config_manager.get_environments()
source_env = next((e for e in environments if e.id == source_env_id), None)
target_env = next((e for e in environments if e.id == target_env_id), None)
if not source_env:
logger.error(f"[get_database_mappings][Coherence:Failed] Source environment not found: {source_env_id}")
raise HTTPException(status_code=404, detail="Source environment not found")
if not target_env:
logger.error(f"[get_database_mappings][Coherence:Failed] Target environment not found: {target_env_id}")
raise HTTPException(status_code=404, detail="Target environment not found")
try:
# Get mapping suggestions using MappingService
suggestions = await mapping_service.get_suggestions(source_env_id, target_env_id)