# [DEF:test_log_persistence:Module] # @SEMANTICS: test, log, persistence, unit_test # @PURPOSE: Unit tests for TaskLogPersistenceService. # @LAYER: Test # @RELATION: TESTS -> TaskLogPersistenceService # @TIER: CRITICAL # [SECTION: IMPORTS] from datetime import datetime from unittest.mock import patch from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from src.models.mapping import Base from src.core.task_manager.persistence import TaskLogPersistenceService from src.core.task_manager.models import LogEntry, LogFilter # [/SECTION] # [DEF:TestLogPersistence:Class] # @PURPOSE: Test suite for TaskLogPersistenceService. # @TIER: CRITICAL # @TEST_DATA: log_entry -> {"task_id": "test-task-1", "level": "INFO", "source": "test_source", "message": "Test message"} class TestLogPersistence: # [DEF:setup_class:Function] # @PURPOSE: Setup test database and service instance. # @PRE: None. # @POST: In-memory database and service instance created. @classmethod def setup_class(cls): """Create an in-memory database for testing.""" cls.engine = create_engine("sqlite:///:memory:") Base.metadata.create_all(bind=cls.engine) cls.TestSessionLocal = sessionmaker(bind=cls.engine) cls.service = TaskLogPersistenceService() # [/DEF:setup_class:Function] # [DEF:teardown_class:Function] # @PURPOSE: Clean up test database. # @PRE: None. # @POST: Database disposed. @classmethod def teardown_class(cls): """Dispose of the database engine.""" cls.engine.dispose() # [/DEF:teardown_class:Function] # [DEF:setup_method:Function] # @PURPOSE: Setup for each test method — clean task_logs table. # @PRE: None. # @POST: task_logs table is empty. def setup_method(self): """Clean task_logs table before each test.""" session = self.TestSessionLocal() from src.models.task import TaskLogRecord session.query(TaskLogRecord).delete() session.commit() session.close() # [/DEF:setup_method:Function] def _patched(self, method_name): """Helper: returns a patch context for TasksSessionLocal.""" return patch( "src.core.task_manager.persistence.TasksSessionLocal", self.TestSessionLocal ) # [DEF:test_add_logs_single:Function] # @PURPOSE: Test adding a single log entry. # @PRE: Service and session initialized. # @POST: Log entry persisted to database. def test_add_logs_single(self): """Test adding a single log entry via add_logs.""" entry = LogEntry( timestamp=datetime.utcnow(), level="INFO", source="test_source", message="Test message" ) with self._patched("add_logs"): self.service.add_logs("test-task-1", [entry]) # Query the database from src.models.task import TaskLogRecord session = self.TestSessionLocal() result = session.query(TaskLogRecord).filter_by(task_id="test-task-1").first() session.close() assert result is not None assert result.level == "INFO" assert result.source == "test_source" assert result.message == "Test message" # [/DEF:test_add_logs_single:Function] # [DEF:test_add_logs_batch:Function] # @PURPOSE: Test adding multiple log entries in batch. # @PRE: Service and session initialized. # @POST: All log entries persisted to database. def test_add_logs_batch(self): """Test adding multiple log entries in batch.""" entries = [ LogEntry(timestamp=datetime.utcnow(), level="INFO", source="source1", message="Message 1"), LogEntry(timestamp=datetime.utcnow(), level="WARNING", source="source2", message="Message 2"), LogEntry(timestamp=datetime.utcnow(), level="ERROR", source="source3", message="Message 3"), ] with self._patched("add_logs"): self.service.add_logs("test-task-2", entries) from src.models.task import TaskLogRecord session = self.TestSessionLocal() results = session.query(TaskLogRecord).filter_by(task_id="test-task-2").all() session.close() assert len(results) == 3 # [/DEF:test_add_logs_batch:Function] # [DEF:test_add_logs_empty:Function] # @PURPOSE: Test adding empty log list (should be no-op). # @PRE: Service initialized. # @POST: No logs added. def test_add_logs_empty(self): """Test adding empty log list is a no-op.""" with self._patched("add_logs"): self.service.add_logs("test-task-X", []) from src.models.task import TaskLogRecord session = self.TestSessionLocal() results = session.query(TaskLogRecord).filter_by(task_id="test-task-X").all() session.close() assert len(results) == 0 # [/DEF:test_add_logs_empty:Function] # [DEF:test_get_logs_by_task_id:Function] # @PURPOSE: Test retrieving logs by task ID. # @PRE: Service and session initialized, logs exist. # @POST: Returns logs for the specified task. def test_get_logs_by_task_id(self): """Test retrieving logs by task ID using LogFilter.""" entries = [ LogEntry(timestamp=datetime.utcnow(), level="INFO", source="src1", message=f"Message {i}") for i in range(5) ] with self._patched("add_logs"): self.service.add_logs("test-task-3", entries) with self._patched("get_logs"): logs = self.service.get_logs("test-task-3", LogFilter()) assert len(logs) == 5 assert all(log.task_id == "test-task-3" for log in logs) # [/DEF:test_get_logs_by_task_id:Function] # [DEF:test_get_logs_with_filters:Function] # @PURPOSE: Test retrieving logs with level and source filters. # @PRE: Service and session initialized, logs exist. # @POST: Returns filtered logs. def test_get_logs_with_filters(self): """Test retrieving logs with level and source filters.""" entries = [ LogEntry(timestamp=datetime.utcnow(), level="INFO", source="api", message="Info message"), LogEntry(timestamp=datetime.utcnow(), level="WARNING", source="api", message="Warning message"), LogEntry(timestamp=datetime.utcnow(), level="ERROR", source="storage", message="Error message"), ] with self._patched("add_logs"): self.service.add_logs("test-task-4", entries) # Test level filter with self._patched("get_logs"): warning_logs = self.service.get_logs("test-task-4", LogFilter(level="WARNING")) assert len(warning_logs) == 1 assert warning_logs[0].level == "WARNING" # Test source filter with self._patched("get_logs"): api_logs = self.service.get_logs("test-task-4", LogFilter(source="api")) assert len(api_logs) == 2 assert all(log.source == "api" for log in api_logs) # [/DEF:test_get_logs_with_filters:Function] # [DEF:test_get_logs_with_pagination:Function] # @PURPOSE: Test retrieving logs with pagination. # @PRE: Service and session initialized, logs exist. # @POST: Returns paginated logs. def test_get_logs_with_pagination(self): """Test retrieving logs with pagination.""" entries = [ LogEntry(timestamp=datetime.utcnow(), level="INFO", source="test", message=f"Message {i}") for i in range(15) ] with self._patched("add_logs"): self.service.add_logs("test-task-5", entries) with self._patched("get_logs"): page1 = self.service.get_logs("test-task-5", LogFilter(limit=10, offset=0)) assert len(page1) == 10 with self._patched("get_logs"): page2 = self.service.get_logs("test-task-5", LogFilter(limit=10, offset=10)) assert len(page2) == 5 # [/DEF:test_get_logs_with_pagination:Function] # [DEF:test_get_logs_with_search:Function] # @PURPOSE: Test retrieving logs with search query. # @PRE: Service and session initialized, logs exist. # @POST: Returns logs matching search query. def test_get_logs_with_search(self): """Test retrieving logs with search query.""" entries = [ LogEntry(timestamp=datetime.utcnow(), level="INFO", source="api", message="User authentication successful"), LogEntry(timestamp=datetime.utcnow(), level="ERROR", source="api", message="Failed to connect to database"), LogEntry(timestamp=datetime.utcnow(), level="INFO", source="storage", message="File saved successfully"), ] with self._patched("add_logs"): self.service.add_logs("test-task-6", entries) with self._patched("get_logs"): auth_logs = self.service.get_logs("test-task-6", LogFilter(search="authentication")) assert len(auth_logs) == 1 assert "authentication" in auth_logs[0].message.lower() # [/DEF:test_get_logs_with_search:Function] # [DEF:test_get_log_stats:Function] # @PURPOSE: Test retrieving log statistics. # @PRE: Service and session initialized, logs exist. # @POST: Returns LogStats model with counts by level and source. def test_get_log_stats(self): """Test retrieving log statistics as LogStats model.""" entries = [ LogEntry(timestamp=datetime.utcnow(), level="INFO", source="api", message="Info 1"), LogEntry(timestamp=datetime.utcnow(), level="INFO", source="api", message="Info 2"), LogEntry(timestamp=datetime.utcnow(), level="WARNING", source="api", message="Warning 1"), LogEntry(timestamp=datetime.utcnow(), level="ERROR", source="storage", message="Error 1"), ] with self._patched("add_logs"): self.service.add_logs("test-task-7", entries) with self._patched("get_log_stats"): stats = self.service.get_log_stats("test-task-7") assert stats is not None assert stats.total_count == 4 assert stats.by_level["INFO"] == 2 assert stats.by_level["WARNING"] == 1 assert stats.by_level["ERROR"] == 1 assert stats.by_source["api"] == 3 assert stats.by_source["storage"] == 1 # [/DEF:test_get_log_stats:Function] # [DEF:test_get_sources:Function] # @PURPOSE: Test retrieving unique log sources. # @PRE: Service and session initialized, logs exist. # @POST: Returns list of unique sources. def test_get_sources(self): """Test retrieving unique log sources.""" entries = [ LogEntry(timestamp=datetime.utcnow(), level="INFO", source="api", message="Message 1"), LogEntry(timestamp=datetime.utcnow(), level="INFO", source="storage", message="Message 2"), LogEntry(timestamp=datetime.utcnow(), level="INFO", source="git", message="Message 3"), ] with self._patched("add_logs"): self.service.add_logs("test-task-8", entries) with self._patched("get_sources"): sources = self.service.get_sources("test-task-8") assert len(sources) == 3 assert "api" in sources assert "storage" in sources assert "git" in sources # [/DEF:test_get_sources:Function] # [DEF:test_delete_logs_for_task:Function] # @PURPOSE: Test deleting logs by task ID. # @PRE: Service and session initialized, logs exist. # @POST: Logs for the task are deleted. def test_delete_logs_for_task(self): """Test deleting logs by task ID.""" entries = [ LogEntry(timestamp=datetime.utcnow(), level="INFO", source="test", message=f"Message {i}") for i in range(3) ] with self._patched("add_logs"): self.service.add_logs("test-task-9", entries) # Verify logs exist with self._patched("get_logs"): logs_before = self.service.get_logs("test-task-9", LogFilter()) assert len(logs_before) == 3 # Delete logs with self._patched("delete_logs_for_task"): self.service.delete_logs_for_task("test-task-9") # Verify logs are deleted with self._patched("get_logs"): logs_after = self.service.get_logs("test-task-9", LogFilter()) assert len(logs_after) == 0 # [/DEF:test_delete_logs_for_task:Function] # [DEF:test_delete_logs_for_tasks:Function] # @PURPOSE: Test deleting logs for multiple tasks. # @PRE: Service and session initialized, logs exist. # @POST: Logs for all specified tasks are deleted. def test_delete_logs_for_tasks(self): """Test deleting logs for multiple tasks at once.""" for task_id in ["multi-1", "multi-2", "multi-3"]: entries = [ LogEntry(timestamp=datetime.utcnow(), level="INFO", source="test", message="msg") ] with self._patched("add_logs"): self.service.add_logs(task_id, entries) with self._patched("delete_logs_for_tasks"): self.service.delete_logs_for_tasks(["multi-1", "multi-2"]) from src.models.task import TaskLogRecord session = self.TestSessionLocal() remaining = session.query(TaskLogRecord).all() session.close() assert len(remaining) == 1 assert remaining[0].task_id == "multi-3" # [/DEF:test_delete_logs_for_tasks:Function] # [DEF:test_delete_logs_for_tasks_empty:Function] # @PURPOSE: Test deleting with empty list (no-op). # @PRE: Service initialized. # @POST: No error, no deletion. def test_delete_logs_for_tasks_empty(self): """Test deleting with empty list is a no-op.""" with self._patched("delete_logs_for_tasks"): self.service.delete_logs_for_tasks([]) # Should not raise # [/DEF:test_delete_logs_for_tasks_empty:Function] # [/DEF:TestLogPersistence:Class] # [/DEF:test_log_persistence:Module]