Files
ss-tools/backend/tests/test_log_persistence.py
2026-02-10 12:53:01 +03:00

396 lines
13 KiB
Python

# [DEF:test_log_persistence:Module]
# @SEMANTICS: test, log, persistence, unit_test
# @PURPOSE: Unit tests for TaskLogPersistenceService.
# @LAYER: Test
# @RELATION: TESTS -> TaskLogPersistenceService
# @TIER: STANDARD
# [SECTION: IMPORTS]
from datetime import datetime
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from src.core.task_manager.persistence import TaskLogPersistenceService
from src.core.task_manager.models import LogEntry
# [/SECTION]
# [DEF:TestLogPersistence:Class]
# @PURPOSE: Test suite for TaskLogPersistenceService.
# @TIER: STANDARD
class TestLogPersistence:
# [DEF:setup_class:Function]
# @PURPOSE: Setup test database and service instance.
# @PRE: None.
# @POST: In-memory database and service instance created.
@classmethod
def setup_class(cls):
"""Create an in-memory database for testing."""
cls.engine = create_engine("sqlite:///:memory:")
cls.SessionLocal = sessionmaker(bind=cls.engine)
cls.service = TaskLogPersistenceService(cls.engine)
# [/DEF:setup_class:Function]
# [DEF:teardown_class:Function]
# @PURPOSE: Clean up test database.
# @PRE: None.
# @POST: Database disposed.
@classmethod
def teardown_class(cls):
"""Dispose of the database engine."""
cls.engine.dispose()
# [/DEF:teardown_class:Function]
# [DEF:setup_method:Function]
# @PURPOSE: Setup for each test method.
# @PRE: None.
# @POST: Fresh database session created.
def setup_method(self):
"""Create a new session for each test."""
self.session = self.SessionLocal()
# [/DEF:setup_method:Function]
# [DEF:teardown_method:Function]
# @PURPOSE: Cleanup after each test method.
# @PRE: None.
# @POST: Session closed and rolled back.
def teardown_method(self):
"""Close the session after each test."""
self.session.close()
# [/DEF:teardown_method:Function]
# [DEF:test_add_log_single:Function]
# @PURPOSE: Test adding a single log entry.
# @PRE: Service and session initialized.
# @POST: Log entry persisted to database.
def test_add_log_single(self):
"""Test adding a single log entry."""
entry = LogEntry(
task_id="test-task-1",
timestamp=datetime.now(),
level="INFO",
source="test_source",
message="Test message"
)
self.service.add_log(entry)
# Query the database
result = self.session.query(LogEntry).filter_by(task_id="test-task-1").first()
assert result is not None
assert result.level == "INFO"
assert result.source == "test_source"
assert result.message == "Test message"
# [/DEF:test_add_log_single:Function]
# [DEF:test_add_log_batch:Function]
# @PURPOSE: Test adding multiple log entries in batch.
# @PRE: Service and session initialized.
# @POST: All log entries persisted to database.
def test_add_log_batch(self):
"""Test adding multiple log entries in batch."""
entries = [
LogEntry(
task_id="test-task-2",
timestamp=datetime.now(),
level="INFO",
source="source1",
message="Message 1"
),
LogEntry(
task_id="test-task-2",
timestamp=datetime.now(),
level="WARNING",
source="source2",
message="Message 2"
),
LogEntry(
task_id="test-task-2",
timestamp=datetime.now(),
level="ERROR",
source="source3",
message="Message 3"
),
]
self.service.add_logs(entries)
# Query the database
results = self.session.query(LogEntry).filter_by(task_id="test-task-2").all()
assert len(results) == 3
assert results[0].level == "INFO"
assert results[1].level == "WARNING"
assert results[2].level == "ERROR"
# [/DEF:test_add_log_batch:Function]
# [DEF:test_get_logs_by_task_id:Function]
# @PURPOSE: Test retrieving logs by task ID.
# @PRE: Service and session initialized, logs exist.
# @POST: Returns logs for the specified task.
def test_get_logs_by_task_id(self):
"""Test retrieving logs by task ID."""
# Add test logs
entries = [
LogEntry(
task_id="test-task-3",
timestamp=datetime.now(),
level="INFO",
source="source1",
message=f"Message {i}"
)
for i in range(5)
]
self.service.add_logs(entries)
# Retrieve logs
logs = self.service.get_logs("test-task-3")
assert len(logs) == 5
assert all(log.task_id == "test-task-3" for log in logs)
# [/DEF:test_get_logs_by_task_id:Function]
# [DEF:test_get_logs_with_filters:Function]
# @PURPOSE: Test retrieving logs with level and source filters.
# @PRE: Service and session initialized, logs exist.
# @POST: Returns filtered logs.
def test_get_logs_with_filters(self):
"""Test retrieving logs with level and source filters."""
# Add test logs with different levels and sources
entries = [
LogEntry(
task_id="test-task-4",
timestamp=datetime.now(),
level="INFO",
source="api",
message="Info message"
),
LogEntry(
task_id="test-task-4",
timestamp=datetime.now(),
level="WARNING",
source="api",
message="Warning message"
),
LogEntry(
task_id="test-task-4",
timestamp=datetime.now(),
level="ERROR",
source="storage",
message="Error message"
),
]
self.service.add_logs(entries)
# Test level filter
warning_logs = self.service.get_logs("test-task-4", level="WARNING")
assert len(warning_logs) == 1
assert warning_logs[0].level == "WARNING"
# Test source filter
api_logs = self.service.get_logs("test-task-4", source="api")
assert len(api_logs) == 2
assert all(log.source == "api" for log in api_logs)
# Test combined filters
api_warning_logs = self.service.get_logs("test-task-4", level="WARNING", source="api")
assert len(api_warning_logs) == 1
# [/DEF:test_get_logs_with_filters:Function]
# [DEF:test_get_logs_with_pagination:Function]
# @PURPOSE: Test retrieving logs with pagination.
# @PRE: Service and session initialized, logs exist.
# @POST: Returns paginated logs.
def test_get_logs_with_pagination(self):
"""Test retrieving logs with pagination."""
# Add 15 test logs
entries = [
LogEntry(
task_id="test-task-5",
timestamp=datetime.now(),
level="INFO",
source="test",
message=f"Message {i}"
)
for i in range(15)
]
self.service.add_logs(entries)
# Test first page
page1 = self.service.get_logs("test-task-5", limit=10, offset=0)
assert len(page1) == 10
# Test second page
page2 = self.service.get_logs("test-task-5", limit=10, offset=10)
assert len(page2) == 5
# [/DEF:test_get_logs_with_pagination:Function]
# [DEF:test_get_logs_with_search:Function]
# @PURPOSE: Test retrieving logs with search query.
# @PRE: Service and session initialized, logs exist.
# @POST: Returns logs matching search query.
def test_get_logs_with_search(self):
"""Test retrieving logs with search query."""
# Add test logs
entries = [
LogEntry(
task_id="test-task-6",
timestamp=datetime.now(),
level="INFO",
source="api",
message="User authentication successful"
),
LogEntry(
task_id="test-task-6",
timestamp=datetime.now(),
level="ERROR",
source="api",
message="Failed to connect to database"
),
LogEntry(
task_id="test-task-6",
timestamp=datetime.now(),
level="INFO",
source="storage",
message="File saved successfully"
),
]
self.service.add_logs(entries)
# Test search for "authentication"
auth_logs = self.service.get_logs("test-task-6", search="authentication")
assert len(auth_logs) == 1
assert "authentication" in auth_logs[0].message.lower()
# Test search for "failed"
failed_logs = self.service.get_logs("test-task-6", search="failed")
assert len(failed_logs) == 1
assert "failed" in failed_logs[0].message.lower()
# [/DEF:test_get_logs_with_search:Function]
# [DEF:test_get_log_stats:Function]
# @PURPOSE: Test retrieving log statistics.
# @PRE: Service and session initialized, logs exist.
# @POST: Returns statistics grouped by level and source.
def test_get_log_stats(self):
"""Test retrieving log statistics."""
# Add test logs
entries = [
LogEntry(
task_id="test-task-7",
timestamp=datetime.now(),
level="INFO",
source="api",
message="Info 1"
),
LogEntry(
task_id="test-task-7",
timestamp=datetime.now(),
level="INFO",
source="api",
message="Info 2"
),
LogEntry(
task_id="test-task-7",
timestamp=datetime.now(),
level="WARNING",
source="api",
message="Warning 1"
),
LogEntry(
task_id="test-task-7",
timestamp=datetime.now(),
level="ERROR",
source="storage",
message="Error 1"
),
]
self.service.add_logs(entries)
# Get stats
stats = self.service.get_log_stats("test-task-7")
assert stats is not None
assert stats["by_level"]["INFO"] == 2
assert stats["by_level"]["WARNING"] == 1
assert stats["by_level"]["ERROR"] == 1
assert stats["by_source"]["api"] == 3
assert stats["by_source"]["storage"] == 1
# [/DEF:test_get_log_stats:Function]
# [DEF:test_get_log_sources:Function]
# @PURPOSE: Test retrieving unique log sources.
# @PRE: Service and session initialized, logs exist.
# @POST: Returns list of unique sources.
def test_get_log_sources(self):
"""Test retrieving unique log sources."""
# Add test logs
entries = [
LogEntry(
task_id="test-task-8",
timestamp=datetime.now(),
level="INFO",
source="api",
message="Message 1"
),
LogEntry(
task_id="test-task-8",
timestamp=datetime.now(),
level="INFO",
source="storage",
message="Message 2"
),
LogEntry(
task_id="test-task-8",
timestamp=datetime.now(),
level="INFO",
source="git",
message="Message 3"
),
]
self.service.add_logs(entries)
# Get sources
sources = self.service.get_log_sources("test-task-8")
assert len(sources) == 3
assert "api" in sources
assert "storage" in sources
assert "git" in sources
# [/DEF:test_get_log_sources:Function]
# [DEF:test_delete_logs_by_task_id:Function]
# @PURPOSE: Test deleting logs by task ID.
# @PRE: Service and session initialized, logs exist.
# @POST: Logs for the task are deleted.
def test_delete_logs_by_task_id(self):
"""Test deleting logs by task ID."""
# Add test logs
entries = [
LogEntry(
task_id="test-task-9",
timestamp=datetime.now(),
level="INFO",
source="test",
message=f"Message {i}"
)
for i in range(3)
]
self.service.add_logs(entries)
# Verify logs exist
logs_before = self.service.get_logs("test-task-9")
assert len(logs_before) == 3
# Delete logs
self.service.delete_logs("test-task-9")
# Verify logs are deleted
logs_after = self.service.get_logs("test-task-9")
assert len(logs_after) == 0
# [/DEF:test_delete_logs_by_task_id:Function]
# [/DEF:TestLogPersistence:Class]
# [/DEF:test_log_persistence:Module]