import sys from pathlib import Path import os import pytest from unittest.mock import MagicMock, patch sys.path.insert(0, str(Path(__file__).parent.parent)) # Mock database before any modules that import it are loaded mock_db = MagicMock() sys.modules['src.core.database'] = mock_db sys.modules['src.plugins.git_plugin.SessionLocal'] = mock_db.SessionLocal sys.modules['src.plugins.migration.SessionLocal'] = mock_db.SessionLocal class TestPluginSmoke: """Smoke tests for plugin loading and initialization.""" def test_plugins_load_successfully(self): """ Verify that all standard plugins can be discovered and instantiated by the PluginLoader without throwing errors (e.g., missing imports, syntax errors, missing class declarations). """ from src.core.plugin_loader import PluginLoader plugin_dir = os.path.join(str(Path(__file__).parent.parent), "src", "plugins") # This will discover and instantiate plugins loader = PluginLoader(plugin_dir) plugins = loader.get_all_plugin_configs() plugin_ids = {p.id for p in plugins} # We expect at least the migration and git plugins to be present expected_plugins = {"superset-migration", "git-integration"} missing_plugins = expected_plugins - plugin_ids assert not missing_plugins, f"Missing expected plugins: {missing_plugins}" @pytest.mark.anyio async def test_task_manager_initializes_with_plugins(self): """ Verify that the TaskManager can initialize with the real PluginLoader. """ from src.core.plugin_loader import PluginLoader from src.core.task_manager.manager import TaskManager plugin_dir = os.path.join(str(Path(__file__).parent.parent), "src", "plugins") loader = PluginLoader(plugin_dir) # Initialize TaskManager with real loader with patch("src.core.task_manager.manager.TaskPersistenceService") as MockPersistence, \ patch("src.core.task_manager.manager.TaskLogPersistenceService"): MockPersistence.return_value.load_tasks.return_value = [] with patch("src.dependencies.config_manager"): manager = TaskManager(loader) # Stop the flusher thread to prevent hanging manager._flusher_stop_event.set() manager._flusher_thread.join(timeout=2) assert manager is not None