407 lines
15 KiB
Python
407 lines
15 KiB
Python
# [DEF:test_task_persistence:Module]
|
|
# @SEMANTICS: test, task, persistence, unit_test
|
|
# @PURPOSE: Unit tests for TaskPersistenceService.
|
|
# @LAYER: Test
|
|
# @RELATION: TESTS -> TaskPersistenceService
|
|
# @TIER: CRITICAL
|
|
# @TEST_DATA: valid_task -> {"id": "test-uuid-1", "plugin_id": "backup", "status": "PENDING"}
|
|
|
|
# [SECTION: IMPORTS]
|
|
from datetime import datetime, timedelta
|
|
from unittest.mock import patch
|
|
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy.orm import sessionmaker
|
|
|
|
from src.models.mapping import Base
|
|
from src.models.task import TaskRecord
|
|
from src.core.task_manager.persistence import TaskPersistenceService
|
|
from src.core.task_manager.models import Task, TaskStatus, LogEntry
|
|
# [/SECTION]
|
|
|
|
|
|
# [DEF:TestTaskPersistenceHelpers:Class]
|
|
# @PURPOSE: Test suite for TaskPersistenceService static helper methods.
|
|
# @TIER: CRITICAL
|
|
class TestTaskPersistenceHelpers:
|
|
|
|
# [DEF:test_json_load_if_needed_none:Function]
|
|
# @PURPOSE: Test _json_load_if_needed with None input.
|
|
def test_json_load_if_needed_none(self):
|
|
assert TaskPersistenceService._json_load_if_needed(None) is None
|
|
# [/DEF:test_json_load_if_needed_none:Function]
|
|
|
|
# [DEF:test_json_load_if_needed_dict:Function]
|
|
# @PURPOSE: Test _json_load_if_needed with dict input.
|
|
def test_json_load_if_needed_dict(self):
|
|
data = {"key": "value"}
|
|
assert TaskPersistenceService._json_load_if_needed(data) == data
|
|
# [/DEF:test_json_load_if_needed_dict:Function]
|
|
|
|
# [DEF:test_json_load_if_needed_list:Function]
|
|
# @PURPOSE: Test _json_load_if_needed with list input.
|
|
def test_json_load_if_needed_list(self):
|
|
data = [1, 2, 3]
|
|
assert TaskPersistenceService._json_load_if_needed(data) == data
|
|
# [/DEF:test_json_load_if_needed_list:Function]
|
|
|
|
# [DEF:test_json_load_if_needed_json_string:Function]
|
|
# @PURPOSE: Test _json_load_if_needed with JSON string.
|
|
def test_json_load_if_needed_json_string(self):
|
|
result = TaskPersistenceService._json_load_if_needed('{"key": "value"}')
|
|
assert result == {"key": "value"}
|
|
# [/DEF:test_json_load_if_needed_json_string:Function]
|
|
|
|
# [DEF:test_json_load_if_needed_empty_string:Function]
|
|
# @PURPOSE: Test _json_load_if_needed with empty/null strings.
|
|
def test_json_load_if_needed_empty_string(self):
|
|
assert TaskPersistenceService._json_load_if_needed("") is None
|
|
assert TaskPersistenceService._json_load_if_needed("null") is None
|
|
assert TaskPersistenceService._json_load_if_needed(" null ") is None
|
|
# [/DEF:test_json_load_if_needed_empty_string:Function]
|
|
|
|
# [DEF:test_json_load_if_needed_plain_string:Function]
|
|
# @PURPOSE: Test _json_load_if_needed with non-JSON string.
|
|
def test_json_load_if_needed_plain_string(self):
|
|
result = TaskPersistenceService._json_load_if_needed("not json")
|
|
assert result == "not json"
|
|
# [/DEF:test_json_load_if_needed_plain_string:Function]
|
|
|
|
# [DEF:test_json_load_if_needed_integer:Function]
|
|
# @PURPOSE: Test _json_load_if_needed with integer.
|
|
def test_json_load_if_needed_integer(self):
|
|
assert TaskPersistenceService._json_load_if_needed(42) == 42
|
|
# [/DEF:test_json_load_if_needed_integer:Function]
|
|
|
|
# [DEF:test_parse_datetime_none:Function]
|
|
# @PURPOSE: Test _parse_datetime with None.
|
|
def test_parse_datetime_none(self):
|
|
assert TaskPersistenceService._parse_datetime(None) is None
|
|
# [/DEF:test_parse_datetime_none:Function]
|
|
|
|
# [DEF:test_parse_datetime_datetime_object:Function]
|
|
# @PURPOSE: Test _parse_datetime with datetime object.
|
|
def test_parse_datetime_datetime_object(self):
|
|
dt = datetime(2024, 1, 1, 12, 0, 0)
|
|
assert TaskPersistenceService._parse_datetime(dt) == dt
|
|
# [/DEF:test_parse_datetime_datetime_object:Function]
|
|
|
|
# [DEF:test_parse_datetime_iso_string:Function]
|
|
# @PURPOSE: Test _parse_datetime with ISO string.
|
|
def test_parse_datetime_iso_string(self):
|
|
result = TaskPersistenceService._parse_datetime("2024-01-01T12:00:00")
|
|
assert isinstance(result, datetime)
|
|
assert result.year == 2024
|
|
# [/DEF:test_parse_datetime_iso_string:Function]
|
|
|
|
# [DEF:test_parse_datetime_invalid_string:Function]
|
|
# @PURPOSE: Test _parse_datetime with invalid string.
|
|
def test_parse_datetime_invalid_string(self):
|
|
assert TaskPersistenceService._parse_datetime("not-a-date") is None
|
|
# [/DEF:test_parse_datetime_invalid_string:Function]
|
|
|
|
# [DEF:test_parse_datetime_integer:Function]
|
|
# @PURPOSE: Test _parse_datetime with non-string, non-datetime.
|
|
def test_parse_datetime_integer(self):
|
|
assert TaskPersistenceService._parse_datetime(12345) is None
|
|
# [/DEF:test_parse_datetime_integer:Function]
|
|
|
|
# [/DEF:TestTaskPersistenceHelpers:Class]
|
|
|
|
|
|
# [DEF:TestTaskPersistenceService:Class]
|
|
# @PURPOSE: Test suite for TaskPersistenceService CRUD operations.
|
|
# @TIER: CRITICAL
|
|
# @TEST_DATA: valid_task -> {"id": "test-uuid-1", "plugin_id": "backup", "status": "PENDING"}
|
|
class TestTaskPersistenceService:
|
|
|
|
# [DEF:setup_class:Function]
|
|
# @PURPOSE: Setup in-memory test database.
|
|
@classmethod
|
|
def setup_class(cls):
|
|
"""Create an in-memory SQLite database for testing."""
|
|
cls.engine = create_engine("sqlite:///:memory:")
|
|
Base.metadata.create_all(bind=cls.engine)
|
|
cls.TestSessionLocal = sessionmaker(bind=cls.engine)
|
|
cls.service = TaskPersistenceService()
|
|
# [/DEF:setup_class:Function]
|
|
|
|
# [DEF:teardown_class:Function]
|
|
# @PURPOSE: Dispose of test database.
|
|
@classmethod
|
|
def teardown_class(cls):
|
|
cls.engine.dispose()
|
|
# [/DEF:teardown_class:Function]
|
|
|
|
# [DEF:setup_method:Function]
|
|
# @PURPOSE: Clean task_records table before each test.
|
|
def setup_method(self):
|
|
session = self.TestSessionLocal()
|
|
session.query(TaskRecord).delete()
|
|
session.commit()
|
|
session.close()
|
|
# [/DEF:setup_method:Function]
|
|
|
|
def _patched(self):
|
|
"""Helper: returns a patch context for TasksSessionLocal."""
|
|
return patch(
|
|
"src.core.task_manager.persistence.TasksSessionLocal",
|
|
self.TestSessionLocal
|
|
)
|
|
|
|
def _make_task(self, **kwargs):
|
|
"""Helper: create a Task with test defaults."""
|
|
defaults = {
|
|
"id": "test-uuid-1",
|
|
"plugin_id": "backup",
|
|
"status": TaskStatus.PENDING,
|
|
"params": {"source_env_id": "env-1"},
|
|
}
|
|
defaults.update(kwargs)
|
|
return Task(**defaults)
|
|
|
|
# [DEF:test_persist_task_new:Function]
|
|
# @PURPOSE: Test persisting a new task creates a record.
|
|
# @PRE: Empty database.
|
|
# @POST: TaskRecord exists in database.
|
|
def test_persist_task_new(self):
|
|
"""Test persisting a new task creates a record."""
|
|
task = self._make_task()
|
|
|
|
with self._patched():
|
|
self.service.persist_task(task)
|
|
|
|
session = self.TestSessionLocal()
|
|
record = session.query(TaskRecord).filter_by(id="test-uuid-1").first()
|
|
session.close()
|
|
|
|
assert record is not None
|
|
assert record.type == "backup"
|
|
assert record.status == "PENDING"
|
|
# [/DEF:test_persist_task_new:Function]
|
|
|
|
# [DEF:test_persist_task_update:Function]
|
|
# @PURPOSE: Test updating an existing task.
|
|
# @PRE: Task already persisted.
|
|
# @POST: Task record updated with new status.
|
|
def test_persist_task_update(self):
|
|
"""Test persisting an existing task updates the record."""
|
|
task = self._make_task(status=TaskStatus.PENDING)
|
|
|
|
with self._patched():
|
|
self.service.persist_task(task)
|
|
|
|
# Update status
|
|
task.status = TaskStatus.RUNNING
|
|
task.started_at = datetime.utcnow()
|
|
|
|
with self._patched():
|
|
self.service.persist_task(task)
|
|
|
|
session = self.TestSessionLocal()
|
|
record = session.query(TaskRecord).filter_by(id="test-uuid-1").first()
|
|
session.close()
|
|
|
|
assert record.status == "RUNNING"
|
|
assert record.started_at is not None
|
|
# [/DEF:test_persist_task_update:Function]
|
|
|
|
# [DEF:test_persist_task_with_logs:Function]
|
|
# @PURPOSE: Test persisting a task with log entries.
|
|
# @PRE: Task has logs attached.
|
|
# @POST: Logs serialized as JSON in task record.
|
|
def test_persist_task_with_logs(self):
|
|
"""Test persisting a task with log entries."""
|
|
task = self._make_task()
|
|
task.logs = [
|
|
LogEntry(message="Step 1", level="INFO", source="plugin"),
|
|
LogEntry(message="Step 2", level="INFO", source="plugin"),
|
|
]
|
|
|
|
with self._patched():
|
|
self.service.persist_task(task)
|
|
|
|
session = self.TestSessionLocal()
|
|
record = session.query(TaskRecord).filter_by(id="test-uuid-1").first()
|
|
session.close()
|
|
|
|
assert record.logs is not None
|
|
assert len(record.logs) == 2
|
|
# [/DEF:test_persist_task_with_logs:Function]
|
|
|
|
# [DEF:test_persist_task_failed_extracts_error:Function]
|
|
# @PURPOSE: Test that FAILED task extracts last error message.
|
|
# @PRE: Task has FAILED status with ERROR logs.
|
|
# @POST: record.error contains last error message.
|
|
def test_persist_task_failed_extracts_error(self):
|
|
"""Test that FAILED tasks extract the last error message."""
|
|
task = self._make_task(status=TaskStatus.FAILED)
|
|
task.logs = [
|
|
LogEntry(message="Started OK", level="INFO", source="plugin"),
|
|
LogEntry(message="Connection failed", level="ERROR", source="plugin"),
|
|
LogEntry(message="Retrying...", level="INFO", source="plugin"),
|
|
LogEntry(message="Fatal: timeout", level="ERROR", source="plugin"),
|
|
]
|
|
|
|
with self._patched():
|
|
self.service.persist_task(task)
|
|
|
|
session = self.TestSessionLocal()
|
|
record = session.query(TaskRecord).filter_by(id="test-uuid-1").first()
|
|
session.close()
|
|
|
|
assert record.error == "Fatal: timeout"
|
|
# [/DEF:test_persist_task_failed_extracts_error:Function]
|
|
|
|
# [DEF:test_persist_tasks_batch:Function]
|
|
# @PURPOSE: Test persisting multiple tasks.
|
|
# @PRE: Empty database.
|
|
# @POST: All task records created.
|
|
def test_persist_tasks_batch(self):
|
|
"""Test persisting multiple tasks at once."""
|
|
tasks = [
|
|
self._make_task(id=f"batch-{i}", plugin_id="migration")
|
|
for i in range(3)
|
|
]
|
|
|
|
with self._patched():
|
|
self.service.persist_tasks(tasks)
|
|
|
|
session = self.TestSessionLocal()
|
|
count = session.query(TaskRecord).count()
|
|
session.close()
|
|
|
|
assert count == 3
|
|
# [/DEF:test_persist_tasks_batch:Function]
|
|
|
|
# [DEF:test_load_tasks:Function]
|
|
# @PURPOSE: Test loading tasks from database.
|
|
# @PRE: Tasks persisted.
|
|
# @POST: Returns list of Task objects with correct data.
|
|
def test_load_tasks(self):
|
|
"""Test loading tasks from database (round-trip)."""
|
|
task = self._make_task(
|
|
status=TaskStatus.SUCCESS,
|
|
started_at=datetime.utcnow(),
|
|
finished_at=datetime.utcnow(),
|
|
)
|
|
task.params = {"key": "value"}
|
|
task.result = {"output": "done"}
|
|
task.logs = [LogEntry(message="Done", level="INFO", source="plugin")]
|
|
|
|
with self._patched():
|
|
self.service.persist_task(task)
|
|
|
|
with self._patched():
|
|
loaded = self.service.load_tasks(limit=10)
|
|
|
|
assert len(loaded) == 1
|
|
assert loaded[0].id == "test-uuid-1"
|
|
assert loaded[0].plugin_id == "backup"
|
|
assert loaded[0].status == TaskStatus.SUCCESS
|
|
assert loaded[0].params == {"key": "value"}
|
|
# [/DEF:test_load_tasks:Function]
|
|
|
|
# [DEF:test_load_tasks_with_status_filter:Function]
|
|
# @PURPOSE: Test loading tasks filtered by status.
|
|
# @PRE: Tasks with different statuses persisted.
|
|
# @POST: Returns only tasks matching status filter.
|
|
def test_load_tasks_with_status_filter(self):
|
|
"""Test loading tasks filtered by status."""
|
|
tasks = [
|
|
self._make_task(id="s1", status=TaskStatus.SUCCESS),
|
|
self._make_task(id="s2", status=TaskStatus.FAILED),
|
|
self._make_task(id="s3", status=TaskStatus.SUCCESS),
|
|
]
|
|
|
|
with self._patched():
|
|
self.service.persist_tasks(tasks)
|
|
|
|
with self._patched():
|
|
failed_tasks = self.service.load_tasks(status=TaskStatus.FAILED)
|
|
|
|
assert len(failed_tasks) == 1
|
|
assert failed_tasks[0].id == "s2"
|
|
assert failed_tasks[0].status == TaskStatus.FAILED
|
|
# [/DEF:test_load_tasks_with_status_filter:Function]
|
|
|
|
# [DEF:test_load_tasks_with_limit:Function]
|
|
# @PURPOSE: Test loading tasks with limit.
|
|
# @PRE: Multiple tasks persisted.
|
|
# @POST: Returns at most `limit` tasks.
|
|
def test_load_tasks_with_limit(self):
|
|
"""Test loading tasks respects limit parameter."""
|
|
tasks = [
|
|
self._make_task(id=f"lim-{i}")
|
|
for i in range(10)
|
|
]
|
|
|
|
with self._patched():
|
|
self.service.persist_tasks(tasks)
|
|
|
|
with self._patched():
|
|
loaded = self.service.load_tasks(limit=3)
|
|
|
|
assert len(loaded) == 3
|
|
# [/DEF:test_load_tasks_with_limit:Function]
|
|
|
|
# [DEF:test_delete_tasks:Function]
|
|
# @PURPOSE: Test deleting tasks by ID list.
|
|
# @PRE: Tasks persisted.
|
|
# @POST: Specified tasks deleted, others remain.
|
|
def test_delete_tasks(self):
|
|
"""Test deleting tasks by ID list."""
|
|
tasks = [
|
|
self._make_task(id="del-1"),
|
|
self._make_task(id="del-2"),
|
|
self._make_task(id="keep-1"),
|
|
]
|
|
|
|
with self._patched():
|
|
self.service.persist_tasks(tasks)
|
|
|
|
with self._patched():
|
|
self.service.delete_tasks(["del-1", "del-2"])
|
|
|
|
session = self.TestSessionLocal()
|
|
remaining = session.query(TaskRecord).all()
|
|
session.close()
|
|
|
|
assert len(remaining) == 1
|
|
assert remaining[0].id == "keep-1"
|
|
# [/DEF:test_delete_tasks:Function]
|
|
|
|
# [DEF:test_delete_tasks_empty_list:Function]
|
|
# @PURPOSE: Test deleting with empty list (no-op).
|
|
# @PRE: None.
|
|
# @POST: No error, no changes.
|
|
def test_delete_tasks_empty_list(self):
|
|
"""Test deleting with empty list is a no-op."""
|
|
with self._patched():
|
|
self.service.delete_tasks([]) # Should not raise
|
|
# [/DEF:test_delete_tasks_empty_list:Function]
|
|
|
|
# [DEF:test_persist_task_with_datetime_in_params:Function]
|
|
# @PURPOSE: Test json_serializable handles datetime in params.
|
|
# @PRE: Task params contain datetime values.
|
|
# @POST: Params serialized correctly.
|
|
def test_persist_task_with_datetime_in_params(self):
|
|
"""Test that datetime values in params are serialized to ISO format."""
|
|
dt = datetime(2024, 6, 15, 10, 30, 0)
|
|
task = self._make_task(params={"timestamp": dt, "name": "test"})
|
|
|
|
with self._patched():
|
|
self.service.persist_task(task)
|
|
|
|
session = self.TestSessionLocal()
|
|
record = session.query(TaskRecord).filter_by(id="test-uuid-1").first()
|
|
session.close()
|
|
|
|
assert record.params is not None
|
|
assert record.params["timestamp"] == "2024-06-15T10:30:00"
|
|
assert record.params["name"] == "test"
|
|
# [/DEF:test_persist_task_with_datetime_in_params:Function]
|
|
|
|
# [/DEF:TestTaskPersistenceService:Class]
|
|
# [/DEF:test_task_persistence:Module]
|