# [DEF:test_log_persistence:Module] # @SEMANTICS: test, log, persistence, unit_test # @PURPOSE: Unit tests for TaskLogPersistenceService. # @LAYER: Test # @RELATION: TESTS -> TaskLogPersistenceService # @TIER: STANDARD # [SECTION: IMPORTS] from datetime import datetime from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from src.core.task_manager.persistence import TaskLogPersistenceService from src.core.task_manager.models import LogEntry # [/SECTION] # [DEF:TestLogPersistence:Class] # @PURPOSE: Test suite for TaskLogPersistenceService. # @TIER: STANDARD class TestLogPersistence: # [DEF:setup_class:Function] # @PURPOSE: Setup test database and service instance. # @PRE: None. # @POST: In-memory database and service instance created. @classmethod def setup_class(cls): """Create an in-memory database for testing.""" cls.engine = create_engine("sqlite:///:memory:") cls.SessionLocal = sessionmaker(bind=cls.engine) cls.service = TaskLogPersistenceService(cls.engine) # [/DEF:setup_class:Function] # [DEF:teardown_class:Function] # @PURPOSE: Clean up test database. # @PRE: None. # @POST: Database disposed. @classmethod def teardown_class(cls): """Dispose of the database engine.""" cls.engine.dispose() # [/DEF:teardown_class:Function] # [DEF:setup_method:Function] # @PURPOSE: Setup for each test method. # @PRE: None. # @POST: Fresh database session created. def setup_method(self): """Create a new session for each test.""" self.session = self.SessionLocal() # [/DEF:setup_method:Function] # [DEF:teardown_method:Function] # @PURPOSE: Cleanup after each test method. # @PRE: None. # @POST: Session closed and rolled back. def teardown_method(self): """Close the session after each test.""" self.session.close() # [/DEF:teardown_method:Function] # [DEF:test_add_log_single:Function] # @PURPOSE: Test adding a single log entry. # @PRE: Service and session initialized. # @POST: Log entry persisted to database. def test_add_log_single(self): """Test adding a single log entry.""" entry = LogEntry( task_id="test-task-1", timestamp=datetime.now(), level="INFO", source="test_source", message="Test message" ) self.service.add_log(entry) # Query the database result = self.session.query(LogEntry).filter_by(task_id="test-task-1").first() assert result is not None assert result.level == "INFO" assert result.source == "test_source" assert result.message == "Test message" # [/DEF:test_add_log_single:Function] # [DEF:test_add_log_batch:Function] # @PURPOSE: Test adding multiple log entries in batch. # @PRE: Service and session initialized. # @POST: All log entries persisted to database. def test_add_log_batch(self): """Test adding multiple log entries in batch.""" entries = [ LogEntry( task_id="test-task-2", timestamp=datetime.now(), level="INFO", source="source1", message="Message 1" ), LogEntry( task_id="test-task-2", timestamp=datetime.now(), level="WARNING", source="source2", message="Message 2" ), LogEntry( task_id="test-task-2", timestamp=datetime.now(), level="ERROR", source="source3", message="Message 3" ), ] self.service.add_logs(entries) # Query the database results = self.session.query(LogEntry).filter_by(task_id="test-task-2").all() assert len(results) == 3 assert results[0].level == "INFO" assert results[1].level == "WARNING" assert results[2].level == "ERROR" # [/DEF:test_add_log_batch:Function] # [DEF:test_get_logs_by_task_id:Function] # @PURPOSE: Test retrieving logs by task ID. # @PRE: Service and session initialized, logs exist. # @POST: Returns logs for the specified task. def test_get_logs_by_task_id(self): """Test retrieving logs by task ID.""" # Add test logs entries = [ LogEntry( task_id="test-task-3", timestamp=datetime.now(), level="INFO", source="source1", message=f"Message {i}" ) for i in range(5) ] self.service.add_logs(entries) # Retrieve logs logs = self.service.get_logs("test-task-3") assert len(logs) == 5 assert all(log.task_id == "test-task-3" for log in logs) # [/DEF:test_get_logs_by_task_id:Function] # [DEF:test_get_logs_with_filters:Function] # @PURPOSE: Test retrieving logs with level and source filters. # @PRE: Service and session initialized, logs exist. # @POST: Returns filtered logs. def test_get_logs_with_filters(self): """Test retrieving logs with level and source filters.""" # Add test logs with different levels and sources entries = [ LogEntry( task_id="test-task-4", timestamp=datetime.now(), level="INFO", source="api", message="Info message" ), LogEntry( task_id="test-task-4", timestamp=datetime.now(), level="WARNING", source="api", message="Warning message" ), LogEntry( task_id="test-task-4", timestamp=datetime.now(), level="ERROR", source="storage", message="Error message" ), ] self.service.add_logs(entries) # Test level filter warning_logs = self.service.get_logs("test-task-4", level="WARNING") assert len(warning_logs) == 1 assert warning_logs[0].level == "WARNING" # Test source filter api_logs = self.service.get_logs("test-task-4", source="api") assert len(api_logs) == 2 assert all(log.source == "api" for log in api_logs) # Test combined filters api_warning_logs = self.service.get_logs("test-task-4", level="WARNING", source="api") assert len(api_warning_logs) == 1 # [/DEF:test_get_logs_with_filters:Function] # [DEF:test_get_logs_with_pagination:Function] # @PURPOSE: Test retrieving logs with pagination. # @PRE: Service and session initialized, logs exist. # @POST: Returns paginated logs. def test_get_logs_with_pagination(self): """Test retrieving logs with pagination.""" # Add 15 test logs entries = [ LogEntry( task_id="test-task-5", timestamp=datetime.now(), level="INFO", source="test", message=f"Message {i}" ) for i in range(15) ] self.service.add_logs(entries) # Test first page page1 = self.service.get_logs("test-task-5", limit=10, offset=0) assert len(page1) == 10 # Test second page page2 = self.service.get_logs("test-task-5", limit=10, offset=10) assert len(page2) == 5 # [/DEF:test_get_logs_with_pagination:Function] # [DEF:test_get_logs_with_search:Function] # @PURPOSE: Test retrieving logs with search query. # @PRE: Service and session initialized, logs exist. # @POST: Returns logs matching search query. def test_get_logs_with_search(self): """Test retrieving logs with search query.""" # Add test logs entries = [ LogEntry( task_id="test-task-6", timestamp=datetime.now(), level="INFO", source="api", message="User authentication successful" ), LogEntry( task_id="test-task-6", timestamp=datetime.now(), level="ERROR", source="api", message="Failed to connect to database" ), LogEntry( task_id="test-task-6", timestamp=datetime.now(), level="INFO", source="storage", message="File saved successfully" ), ] self.service.add_logs(entries) # Test search for "authentication" auth_logs = self.service.get_logs("test-task-6", search="authentication") assert len(auth_logs) == 1 assert "authentication" in auth_logs[0].message.lower() # Test search for "failed" failed_logs = self.service.get_logs("test-task-6", search="failed") assert len(failed_logs) == 1 assert "failed" in failed_logs[0].message.lower() # [/DEF:test_get_logs_with_search:Function] # [DEF:test_get_log_stats:Function] # @PURPOSE: Test retrieving log statistics. # @PRE: Service and session initialized, logs exist. # @POST: Returns statistics grouped by level and source. def test_get_log_stats(self): """Test retrieving log statistics.""" # Add test logs entries = [ LogEntry( task_id="test-task-7", timestamp=datetime.now(), level="INFO", source="api", message="Info 1" ), LogEntry( task_id="test-task-7", timestamp=datetime.now(), level="INFO", source="api", message="Info 2" ), LogEntry( task_id="test-task-7", timestamp=datetime.now(), level="WARNING", source="api", message="Warning 1" ), LogEntry( task_id="test-task-7", timestamp=datetime.now(), level="ERROR", source="storage", message="Error 1" ), ] self.service.add_logs(entries) # Get stats stats = self.service.get_log_stats("test-task-7") assert stats is not None assert stats["by_level"]["INFO"] == 2 assert stats["by_level"]["WARNING"] == 1 assert stats["by_level"]["ERROR"] == 1 assert stats["by_source"]["api"] == 3 assert stats["by_source"]["storage"] == 1 # [/DEF:test_get_log_stats:Function] # [DEF:test_get_log_sources:Function] # @PURPOSE: Test retrieving unique log sources. # @PRE: Service and session initialized, logs exist. # @POST: Returns list of unique sources. def test_get_log_sources(self): """Test retrieving unique log sources.""" # Add test logs entries = [ LogEntry( task_id="test-task-8", timestamp=datetime.now(), level="INFO", source="api", message="Message 1" ), LogEntry( task_id="test-task-8", timestamp=datetime.now(), level="INFO", source="storage", message="Message 2" ), LogEntry( task_id="test-task-8", timestamp=datetime.now(), level="INFO", source="git", message="Message 3" ), ] self.service.add_logs(entries) # Get sources sources = self.service.get_log_sources("test-task-8") assert len(sources) == 3 assert "api" in sources assert "storage" in sources assert "git" in sources # [/DEF:test_get_log_sources:Function] # [DEF:test_delete_logs_by_task_id:Function] # @PURPOSE: Test deleting logs by task ID. # @PRE: Service and session initialized, logs exist. # @POST: Logs for the task are deleted. def test_delete_logs_by_task_id(self): """Test deleting logs by task ID.""" # Add test logs entries = [ LogEntry( task_id="test-task-9", timestamp=datetime.now(), level="INFO", source="test", message=f"Message {i}" ) for i in range(3) ] self.service.add_logs(entries) # Verify logs exist logs_before = self.service.get_logs("test-task-9") assert len(logs_before) == 3 # Delete logs self.service.delete_logs("test-task-9") # Verify logs are deleted logs_after = self.service.get_logs("test-task-9") assert len(logs_after) == 0 # [/DEF:test_delete_logs_by_task_id:Function] # [/DEF:TestLogPersistence:Class] # [/DEF:test_log_persistence:Module]