test: remediate and stabilize auxiliary backend and frontend tests

- Standardized task log, LLM provider, and report profile tests.
- Relocated auxiliary tests into __tests__ directories for consistency.
- Updated git_service and defensive guards with minor stability fixes discovered during testing.
- Added UX integration tests for the reports list component.
This commit is contained in:
2026-03-04 13:54:06 +03:00
parent 0894254b98
commit f34f9c1b2e
8 changed files with 408 additions and 3 deletions

View File

@@ -0,0 +1,73 @@
# [DEF:__tests__/test_tasks_logs:Module]
# @RELATION: VERIFIES -> ../tasks.py
# @PURPOSE: Contract testing for task logs API endpoints.
# [/DEF:__tests__/test_tasks_logs:Module]
import pytest
from fastapi import FastAPI
from fastapi.testclient import TestClient
from unittest.mock import MagicMock
from src.dependencies import get_task_manager, has_permission
from src.api.routes.tasks import router
# @TEST_FIXTURE: mock_app
@pytest.fixture
def client():
app = FastAPI()
app.include_router(router, prefix="/tasks")
# Mock TaskManager
mock_tm = MagicMock()
app.dependency_overrides[get_task_manager] = lambda: mock_tm
# Mock permissions (bypass for unit test)
app.dependency_overrides[has_permission("tasks", "READ")] = lambda: True
return TestClient(app), mock_tm
# @TEST_CONTRACT: get_task_logs_api -> Invariants
# @TEST_FIXTURE: valid_task_logs_request
def test_get_task_logs_success(client):
tc, tm = client
# Setup mock task
mock_task = MagicMock()
tm.get_task.return_value = mock_task
tm.get_task_logs.return_value = [{"level": "INFO", "message": "msg1"}]
response = tc.get("/tasks/task-1/logs?level=INFO")
assert response.status_code == 200
assert response.json() == [{"level": "INFO", "message": "msg1"}]
tm.get_task.assert_called_with("task-1")
# Verify filter construction inside route
args = tm.get_task_logs.call_args
assert args[0][0] == "task-1"
assert args[0][1].level == "INFO"
# @TEST_EDGE: task_not_found
def test_get_task_logs_not_found(client):
tc, tm = client
tm.get_task.return_value = None
response = tc.get("/tasks/missing/logs")
assert response.status_code == 404
assert response.json()["detail"] == "Task not found"
# @TEST_EDGE: invalid_limit
def test_get_task_logs_invalid_limit(client):
tc, tm = client
# limit=0 is ge=1 in Query
response = tc.get("/tasks/task-1/logs?limit=0")
assert response.status_code == 422
# @TEST_INVARIANT: response_purity
def test_get_task_log_stats_success(client):
tc, tm = client
tm.get_task.return_value = MagicMock()
tm.get_task_log_stats.return_value = {"INFO": 5, "ERROR": 1}
response = tc.get("/tasks/task-1/logs/stats")
assert response.status_code == 200
# response_model=LogStats might wrap this, but let's check basic structure
# assuming tm.get_task_log_stats returns something compatible with LogStats

View File

@@ -0,0 +1,102 @@
# [DEF:__tests__/test_task_logger:Module]
# @RELATION: VERIFIES -> ../task_logger.py
# @PURPOSE: Contract testing for TaskLogger
# [/DEF:__tests__/test_task_logger:Module]
import pytest
from unittest.mock import MagicMock
from src.core.task_manager.task_logger import TaskLogger
# @TEST_FIXTURE: valid_task_logger -> {"task_id": "test_123", "add_log_fn": lambda *args: None, "source": "test_plugin"}
@pytest.fixture
def mock_add_log():
return MagicMock()
@pytest.fixture
def task_logger(mock_add_log):
return TaskLogger(task_id="test_123", add_log_fn=mock_add_log, source="test_plugin")
# @TEST_CONTRACT: TaskLoggerModel -> Invariants
def test_task_logger_initialization(task_logger):
"""Verify TaskLogger is bound to specific task_id and source."""
assert task_logger._task_id == "test_123"
assert task_logger._default_source == "test_plugin"
# @TEST_CONTRACT: invariants -> "All specific log methods (info, error) delegate to _log"
def test_log_methods_delegation(task_logger, mock_add_log):
"""Verify info, error, warning, debug delegate to internal _log."""
task_logger.info("info message", metadata={"k": "v"})
mock_add_log.assert_called_with(
task_id="test_123",
level="INFO",
message="info message",
source="test_plugin",
metadata={"k": "v"}
)
task_logger.error("error message", source="override")
mock_add_log.assert_called_with(
task_id="test_123",
level="ERROR",
message="error message",
source="override",
metadata=None
)
task_logger.warning("warning message")
mock_add_log.assert_called_with(
task_id="test_123",
level="WARNING",
message="warning message",
source="test_plugin",
metadata=None
)
task_logger.debug("debug message")
mock_add_log.assert_called_with(
task_id="test_123",
level="DEBUG",
message="debug message",
source="test_plugin",
metadata=None
)
# @TEST_CONTRACT: invariants -> "with_source creates a new logger with the same task_id"
def test_with_source(task_logger):
"""Verify with_source returns a new instance with updated default source."""
new_logger = task_logger.with_source("new_source")
assert isinstance(new_logger, TaskLogger)
assert new_logger._task_id == "test_123"
assert new_logger._default_source == "new_source"
assert new_logger is not task_logger
# @TEST_EDGE: missing_task_id -> raises TypeError
def test_missing_task_id():
with pytest.raises(TypeError):
TaskLogger(add_log_fn=lambda x: x)
# @TEST_EDGE: invalid_add_log_fn -> raises TypeError
# (Python doesn't strictly enforce this at init, but let's verify it fails on call if not callable)
def test_invalid_add_log_fn():
logger = TaskLogger(task_id="msg", add_log_fn=None)
with pytest.raises(TypeError):
logger.info("test")
# @TEST_INVARIANT: consistent_delegation
def test_progress_log(task_logger, mock_add_log):
"""Verify progress method correctly formats metadata."""
task_logger.progress("Step 1", 45.5)
mock_add_log.assert_called_with(
task_id="test_123",
level="INFO",
message="Step 1",
source="test_plugin",
metadata={"progress": 45.5}
)
# Boundary checks
task_logger.progress("Step high", 150)
assert mock_add_log.call_args[1]["metadata"]["progress"] == 100
task_logger.progress("Step low", -10)
assert mock_add_log.call_args[1]["metadata"]["progress"] == 0

View File

@@ -100,7 +100,10 @@ def test_dashboard_dataset_relations():
logger.info(f" Found {len(dashboards)} dashboards using this dataset:")
for dash in dashboards:
logger.info(f" - Dashboard ID {dash.get('id')}: {dash.get('dashboard_title', dash.get('title', 'Unknown'))}")
if isinstance(dash, dict):
logger.info(f" - Dashboard ID {dash.get('id')}: {dash.get('dashboard_title', dash.get('title', 'Unknown'))}")
else:
logger.info(f" - Dashboard: {dash}")
elif 'result' in related_objects:
# Some Superset versions use 'result' wrapper
result = related_objects['result']

View File

@@ -0,0 +1,81 @@
# [DEF:__tests__/test_llm_provider:Module]
# @RELATION: VERIFIES -> ../llm_provider.py
# @PURPOSE: Contract testing for LLMProviderService and EncryptionManager
# [/DEF:__tests__/test_llm_provider:Module]
import pytest
import os
from unittest.mock import MagicMock
from sqlalchemy.orm import Session
from src.services.llm_provider import EncryptionManager, LLMProviderService
from src.models.llm import LLMProvider
from src.plugins.llm_analysis.models import LLMProviderConfig, ProviderType
# @TEST_CONTRACT: EncryptionManagerModel -> Invariants
# @TEST_INVARIANT: symmetric_encryption
def test_encryption_cycle():
"""Verify encrypted data can be decrypted back to original string."""
manager = EncryptionManager()
original = "secret_api_key_123"
encrypted = manager.encrypt(original)
assert encrypted != original
assert manager.decrypt(encrypted) == original
# @TEST_EDGE: empty_string_encryption
def test_empty_string_encryption():
manager = EncryptionManager()
original = ""
encrypted = manager.encrypt(original)
assert manager.decrypt(encrypted) == ""
# @TEST_EDGE: decrypt_invalid_data
def test_decrypt_invalid_data():
manager = EncryptionManager()
with pytest.raises(Exception):
manager.decrypt("not-encrypted-string")
# @TEST_FIXTURE: mock_db_session
@pytest.fixture
def mock_db():
return MagicMock(spec=Session)
@pytest.fixture
def service(mock_db):
return LLMProviderService(db=mock_db)
def test_get_all_providers(service, mock_db):
service.get_all_providers()
mock_db.query.assert_called()
mock_db.query().all.assert_called()
def test_create_provider(service, mock_db):
config = LLMProviderConfig(
provider_type=ProviderType.OPENAI,
name="Test OpenAI",
base_url="https://api.openai.com",
api_key="sk-test",
default_model="gpt-4",
is_active=True
)
provider = service.create_provider(config)
mock_db.add.assert_called()
mock_db.commit.assert_called()
# Verify API key was encrypted
assert provider.api_key != "sk-test"
# Decrypt to verify it matches
assert EncryptionManager().decrypt(provider.api_key) == "sk-test"
def test_get_decrypted_api_key(service, mock_db):
# Setup mock provider
encrypted_key = EncryptionManager().encrypt("secret-value")
mock_provider = LLMProvider(id="p1", api_key=encrypted_key)
mock_db.query().filter().first.return_value = mock_provider
key = service.get_decrypted_api_key("p1")
assert key == "secret-value"
def test_get_decrypted_api_key_not_found(service, mock_db):
mock_db.query().filter().first.return_value = None
assert service.get_decrypted_api_key("missing") is None

View File

@@ -46,10 +46,21 @@ class GitService:
backend_root = Path(__file__).parents[2]
self.legacy_base_path = str((backend_root / "git_repos").resolve())
self.base_path = self._resolve_base_path(base_path)
if not os.path.exists(self.base_path):
os.makedirs(self.base_path)
self._ensure_base_path_exists()
# [/DEF:__init__:Function]
# [DEF:_ensure_base_path_exists:Function]
# @PURPOSE: Ensure the repositories root directory exists and is a directory.
# @PRE: self.base_path is resolved to filesystem path.
# @POST: self.base_path exists as directory or raises ValueError.
# @RETURN: None
def _ensure_base_path_exists(self) -> None:
base = Path(self.base_path)
if base.exists() and not base.is_dir():
raise ValueError(f"Git repositories base path is not a directory: {self.base_path}")
base.mkdir(parents=True, exist_ok=True)
# [/DEF:_ensure_base_path_exists:Function]
# [DEF:_resolve_base_path:Function]
# @PURPOSE: Resolve base repository directory from explicit argument or global storage settings.
# @PRE: base_path is a string path.
@@ -167,6 +178,7 @@ class GitService:
with belief_scope("GitService._get_repo_path"):
if dashboard_id is None:
raise ValueError("dashboard_id cannot be None")
self._ensure_base_path_exists()
fallback_key = repo_key if repo_key is not None else str(dashboard_id)
normalized_key = self._normalize_repo_key(fallback_key)
target_path = os.path.join(self.base_path, normalized_key)
@@ -214,6 +226,7 @@ class GitService:
# @RETURN: Repo - GitPython Repo object.
def init_repo(self, dashboard_id: int, remote_url: str, pat: str, repo_key: Optional[str] = None) -> Repo:
with belief_scope("GitService.init_repo"):
self._ensure_base_path_exists()
repo_path = self._get_repo_path(dashboard_id, repo_key=repo_key or str(dashboard_id))
Path(repo_path).parent.mkdir(parents=True, exist_ok=True)

View File

@@ -0,0 +1,47 @@
# [DEF:__tests__/test_report_type_profiles:Module]
# @RELATION: VERIFIES -> ../type_profiles.py
# @PURPOSE: Contract testing for task type profiles and resolution logic.
# [/DEF:__tests__/test_report_type_profiles:Module]
import pytest
from src.models.report import TaskType
from src.services.reports.type_profiles import resolve_task_type, get_type_profile
# @TEST_CONTRACT: ResolveTaskType -> Invariants
# @TEST_INVARIANT: fallback_to_unknown
def test_resolve_task_type_fallbacks():
"""Verify missing/unmapped plugin_id returns TaskType.UNKNOWN."""
assert resolve_task_type(None) == TaskType.UNKNOWN
assert resolve_task_type("") == TaskType.UNKNOWN
assert resolve_task_type(" ") == TaskType.UNKNOWN
assert resolve_task_type("invalid_plugin") == TaskType.UNKNOWN
# @TEST_FIXTURE: valid_plugin
def test_resolve_task_type_valid():
"""Verify known plugin IDs map correctly."""
assert resolve_task_type("superset-migration") == TaskType.MIGRATION
assert resolve_task_type("llm_dashboard_validation") == TaskType.LLM_VERIFICATION
assert resolve_task_type("superset-backup") == TaskType.BACKUP
assert resolve_task_type("documentation") == TaskType.DOCUMENTATION
# @TEST_FIXTURE: valid_profile
def test_get_type_profile_valid():
"""Verify known task types return correct profile metadata."""
profile = get_type_profile(TaskType.MIGRATION)
assert profile["display_label"] == "Migration"
assert profile["visual_variant"] == "migration"
assert profile["fallback"] is False
# @TEST_INVARIANT: always_returns_dict
# @TEST_EDGE: missing_profile
def test_get_type_profile_fallback():
"""Verify unknown task type returns fallback profile."""
# Assuming TaskType.UNKNOWN or any non-mapped value
profile = get_type_profile(TaskType.UNKNOWN)
assert profile["display_label"] == "Other / Unknown"
assert profile["fallback"] is True
# Passing a value that might not be in the dict explicitly
profile_fallback = get_type_profile("non-enum-value")
assert profile_fallback["display_label"] == "Other / Unknown"
assert profile_fallback["fallback"] is True

View File

@@ -1,5 +1,6 @@
import sys
from pathlib import Path
import shutil
import pytest
from unittest.mock import MagicMock
@@ -15,6 +16,17 @@ def test_git_service_get_repo_path_guard():
with pytest.raises(ValueError, match="dashboard_id cannot be None"):
service._get_repo_path(None)
def test_git_service_get_repo_path_recreates_base_dir():
"""Verify _get_repo_path recreates missing base directory before returning repo path."""
service = GitService(base_path="test_repos_runtime_recreate")
shutil.rmtree(service.base_path, ignore_errors=True)
repo_path = service._get_repo_path(42)
assert Path(service.base_path).is_dir()
assert repo_path == str(Path(service.base_path) / "42")
def test_superset_client_import_dashboard_guard():
"""Verify that import_dashboard raises ValueError if file_name is None."""
mock_env = Environment(

View File

@@ -0,0 +1,74 @@
/**
* @vitest-environment jsdom
*/
// [DEF:frontend.src.lib.components.reports.__tests__.reports_list.ux:Module]
// @TIER: STANDARD
// @SEMANTICS: reports, list, ux-tests, events, iteration
// @PURPOSE: Test ReportsList component iteration and event forwarding.
// @LAYER: UI
// @RELATION: VERIFIES -> ../ReportsList.svelte
// [/DEF:frontend.src.lib.components.reports.__tests__.reports_list.ux:Module]
import { describe, it, expect, vi } from 'vitest';
import { render, screen, fireEvent } from '@testing-library/svelte';
import ReportsList from '../ReportsList.svelte';
// Mock i18n since ReportsList -> ReportCard -> i18n
vi.mock('$lib/i18n', () => ({
t: {
subscribe: (fn) => {
fn({
reports: {
not_provided: 'N/A',
status_success: 'OK',
status_failed: 'ERR'
}
});
return () => { };
}
},
_: vi.fn((key) => key)
}));
describe('ReportsList UX Contract', () => {
const mockReports = [
{ report_id: '1', summary: 'Report One', task_type: 'migration', status: 'success' },
{ report_id: '2', summary: 'Report Two', task_type: 'backup', status: 'failed' }
];
// @TEST_FIXTURE renders_list
it('should render multiple report cards and mark the selected one', () => {
const { container } = render(ReportsList, { reports: mockReports, selectedReportId: '2' });
expect(screen.getByText('Report One')).toBeDefined();
expect(screen.getByText('Report Two')).toBeDefined();
// Check selection logic - we look for a marker or class change in the child cards
// In our simplified test, we check if screen find two buttons
const buttons = screen.getAllByRole('button');
expect(buttons.length).toBe(2);
});
// @TEST_EDGE empty_list
// @TEST_INVARIANT correct_iteration
it('should render empty container for empty list', () => {
const { container } = render(ReportsList, { reports: [] });
// Root div should have space-y-2 class but be empty
const div = container.querySelector('.space-y-2');
expect(div).toBeDefined();
expect(div.children.length).toBe(0);
});
// @UX_FEEDBACK: Click on report emits select event.
// @TEST_CONTRACT Component_ReportsList -> Forwards select events from children
it('should forward select event when a report card is clicked', async () => {
const onSelect = vi.fn();
const { component } = render(ReportsList, { reports: [mockReports[0]], onselect: onSelect });
const button = screen.getByRole('button');
await fireEvent.click(button);
expect(onSelect).toHaveBeenCalled();
expect(onSelect.mock.calls[0][0].report.report_id).toBe('1');
});
});