124 lines
4.6 KiB
Python
124 lines
4.6 KiB
Python
import pytest
|
|
from fastapi.testclient import TestClient
|
|
from unittest.mock import MagicMock
|
|
from src.app import app
|
|
from src.dependencies import get_config_manager, get_task_manager, get_resource_service, has_permission
|
|
|
|
client = TestClient(app)
|
|
|
|
# [DEF:test_dashboards_api:Test]
|
|
# @PURPOSE: Verify GET /api/dashboards contract compliance
|
|
# @TEST: Valid env_id returns 200 and dashboard list
|
|
# @TEST: Invalid env_id returns 404
|
|
# @TEST: Search filter works
|
|
|
|
@pytest.fixture
|
|
def mock_deps():
|
|
config_manager = MagicMock()
|
|
task_manager = MagicMock()
|
|
resource_service = MagicMock()
|
|
|
|
# Mock environment
|
|
env = MagicMock()
|
|
env.id = "env1"
|
|
config_manager.get_environments.return_value = [env]
|
|
|
|
# Mock tasks
|
|
task_manager.get_all_tasks.return_value = []
|
|
|
|
# Mock dashboards
|
|
resource_service.get_dashboards_with_status.return_value = [
|
|
{"id": 1, "title": "Sales", "slug": "sales", "git_status": {"branch": "main", "sync_status": "OK"}, "last_task": None},
|
|
{"id": 2, "title": "Marketing", "slug": "mkt", "git_status": None, "last_task": {"task_id": "t1", "status": "SUCCESS"}}
|
|
]
|
|
|
|
app.dependency_overrides[get_config_manager] = lambda: config_manager
|
|
app.dependency_overrides[get_task_manager] = lambda: task_manager
|
|
app.dependency_overrides[get_resource_service] = lambda: resource_service
|
|
|
|
# Bypass permission check
|
|
mock_user = MagicMock()
|
|
mock_user.username = "testadmin"
|
|
|
|
# Override both get_current_user and has_permission
|
|
from src.dependencies import get_current_user
|
|
app.dependency_overrides[get_current_user] = lambda: mock_user
|
|
|
|
# We need to override the specific instance returned by has_permission
|
|
app.dependency_overrides[has_permission("plugin:migration", "READ")] = lambda: mock_user
|
|
|
|
yield {
|
|
"config": config_manager,
|
|
"task": task_manager,
|
|
"resource": resource_service
|
|
}
|
|
|
|
app.dependency_overrides.clear()
|
|
|
|
def test_get_dashboards_success(mock_deps):
|
|
response = client.get("/api/dashboards?env_id=env1")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert "dashboards" in data
|
|
assert len(data["dashboards"]) == 2
|
|
assert data["dashboards"][0]["title"] == "Sales"
|
|
assert data["dashboards"][0]["git_status"]["sync_status"] == "OK"
|
|
|
|
def test_get_dashboards_not_found(mock_deps):
|
|
response = client.get("/api/dashboards?env_id=invalid")
|
|
assert response.status_code == 404
|
|
|
|
def test_get_dashboards_search(mock_deps):
|
|
response = client.get("/api/dashboards?env_id=env1&search=Sales")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert len(data["dashboards"]) == 1
|
|
assert data["dashboards"][0]["title"] == "Sales"
|
|
|
|
# [/DEF:test_dashboards_api:Test]
|
|
|
|
# [DEF:test_datasets_api:Test]
|
|
# @PURPOSE: Verify GET /api/datasets contract compliance
|
|
# @TEST: Valid env_id returns 200 and dataset list
|
|
# @TEST: Invalid env_id returns 404
|
|
# @TEST: Search filter works
|
|
# @TEST: Negative - Service failure returns 503
|
|
|
|
def test_get_datasets_success(mock_deps):
|
|
mock_deps["resource"].get_datasets_with_status.return_value = [
|
|
{"id": 1, "table_name": "orders", "schema": "public", "database": "db1", "mapped_fields": {"total": 10, "mapped": 5}, "last_task": None}
|
|
]
|
|
|
|
response = client.get("/api/datasets?env_id=env1")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert "datasets" in data
|
|
assert len(data["datasets"]) == 1
|
|
assert data["datasets"][0]["table_name"] == "orders"
|
|
assert data["datasets"][0]["mapped_fields"]["mapped"] == 5
|
|
|
|
def test_get_datasets_not_found(mock_deps):
|
|
response = client.get("/api/datasets?env_id=invalid")
|
|
assert response.status_code == 404
|
|
|
|
def test_get_datasets_search(mock_deps):
|
|
mock_deps["resource"].get_datasets_with_status.return_value = [
|
|
{"id": 1, "table_name": "orders", "schema": "public", "database": "db1", "mapped_fields": {"total": 10, "mapped": 5}, "last_task": None},
|
|
{"id": 2, "table_name": "users", "schema": "public", "database": "db1", "mapped_fields": {"total": 5, "mapped": 5}, "last_task": None}
|
|
]
|
|
|
|
response = client.get("/api/datasets?env_id=env1&search=orders")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert len(data["datasets"]) == 1
|
|
assert data["datasets"][0]["table_name"] == "orders"
|
|
|
|
def test_get_datasets_service_failure(mock_deps):
|
|
mock_deps["resource"].get_datasets_with_status.side_effect = Exception("Superset down")
|
|
|
|
response = client.get("/api/datasets?env_id=env1")
|
|
assert response.status_code == 503
|
|
assert "Failed to fetch datasets" in response.json()["detail"]
|
|
|
|
# [/DEF:test_datasets_api:Test]
|