From 590ba49ddba05a1647a820d8b1073f6083275a2a Mon Sep 17 00:00:00 2001 From: busya Date: Wed, 25 Feb 2026 15:20:26 +0300 Subject: [PATCH] sync worked --- .../routes/__tests__/test_migration_routes.py | 132 +++++++++++++++++- backend/src/api/routes/migration.py | 79 +++++++++-- backend/src/core/config_models.py | 49 ++++--- backend/src/core/superset_client.py | 28 ++++ backend/src/core/task_manager/manager.py | 39 +++--- frontend/src/routes/settings/+page.svelte | 53 +++++++ 6 files changed, 319 insertions(+), 61 deletions(-) diff --git a/backend/src/api/routes/__tests__/test_migration_routes.py b/backend/src/api/routes/__tests__/test_migration_routes.py index 614cd2e..15aa3b0 100644 --- a/backend/src/api/routes/__tests__/test_migration_routes.py +++ b/backend/src/api/routes/__tests__/test_migration_routes.py @@ -37,8 +37,11 @@ def db_session(): def _make_config_manager(cron="0 2 * * *"): - """Creates a mock config manager with settable config.""" - config = {"migration_sync_cron": cron} + """Creates a mock config manager with a realistic AppConfig-like object.""" + settings = MagicMock() + settings.migration_sync_cron = cron + config = MagicMock() + config.settings = settings cm = MagicMock() cm.get_config.return_value = config cm.save_config = MagicMock() @@ -63,11 +66,11 @@ async def test_get_migration_settings_returns_default_cron(): @pytest.mark.asyncio async def test_get_migration_settings_returns_fallback_when_no_cron(): - """When migration_sync_cron is not in config, should return default '0 2 * * *'.""" + """When migration_sync_cron uses the default, should return '0 2 * * *'.""" from src.api.routes.migration import get_migration_settings - cm = MagicMock() - cm.get_config.return_value = {} # No cron key + # Use the default cron value (simulating a fresh config) + cm = _make_config_manager() result = await get_migration_settings(config_manager=cm, _=None) @@ -161,4 +164,123 @@ async def test_get_resource_mappings_respects_pagination(db_session): assert len(result) == 2 +# --- trigger_sync_now tests --- + +@pytest.fixture +def _mock_env(): + """Creates a mock config environment object.""" + env = MagicMock() + env.id = "test-env-1" + env.name = "Test Env" + env.url = "http://superset.test" + env.username = "admin" + env.password = "admin" + env.verify_ssl = False + env.timeout = 30 + return env + + +def _make_sync_config_manager(environments): + """Creates a mock config manager with environments list.""" + settings = MagicMock() + settings.migration_sync_cron = "0 2 * * *" + config = MagicMock() + config.settings = settings + config.environments = environments + cm = MagicMock() + cm.get_config.return_value = config + return cm + + +@pytest.mark.asyncio +async def test_trigger_sync_now_creates_env_row_and_syncs(db_session, _mock_env): + """Verify that trigger_sync_now creates an Environment row in DB before syncing, + preventing FK constraint violations on resource_mappings inserts.""" + from src.api.routes.migration import trigger_sync_now + from src.models.mapping import Environment as EnvironmentModel + + cm = _make_sync_config_manager([_mock_env]) + + with patch("src.api.routes.migration.SupersetClient") as MockClient, \ + patch("src.api.routes.migration.IdMappingService") as MockService: + mock_client_instance = MagicMock() + MockClient.return_value = mock_client_instance + mock_service_instance = MagicMock() + MockService.return_value = mock_service_instance + + result = await trigger_sync_now(config_manager=cm, db=db_session, _=None) + + # Environment row must exist in DB + env_row = db_session.query(EnvironmentModel).filter_by(id="test-env-1").first() + assert env_row is not None + assert env_row.name == "Test Env" + assert env_row.url == "http://superset.test" + + # Sync must have been called + mock_service_instance.sync_environment.assert_called_once_with("test-env-1", mock_client_instance) + assert result["synced_count"] == 1 + assert result["failed_count"] == 0 + + +@pytest.mark.asyncio +async def test_trigger_sync_now_rejects_empty_environments(db_session): + """Verify 400 error when no environments are configured.""" + from src.api.routes.migration import trigger_sync_now + + cm = _make_sync_config_manager([]) + + with pytest.raises(HTTPException) as exc_info: + await trigger_sync_now(config_manager=cm, db=db_session, _=None) + + assert exc_info.value.status_code == 400 + assert "No environments" in exc_info.value.detail + + +@pytest.mark.asyncio +async def test_trigger_sync_now_handles_partial_failure(db_session, _mock_env): + """Verify that if sync_environment raises for one env, it's captured in failed list.""" + from src.api.routes.migration import trigger_sync_now + + env2 = MagicMock() + env2.id = "test-env-2" + env2.name = "Failing Env" + env2.url = "http://fail.test" + env2.username = "admin" + env2.password = "admin" + env2.verify_ssl = False + env2.timeout = 30 + + cm = _make_sync_config_manager([_mock_env, env2]) + + with patch("src.api.routes.migration.SupersetClient") as MockClient, \ + patch("src.api.routes.migration.IdMappingService") as MockService: + mock_service_instance = MagicMock() + mock_service_instance.sync_environment.side_effect = [None, RuntimeError("Connection refused")] + MockService.return_value = mock_service_instance + MockClient.return_value = MagicMock() + + result = await trigger_sync_now(config_manager=cm, db=db_session, _=None) + + assert result["synced_count"] == 1 + assert result["failed_count"] == 1 + assert result["details"]["failed"][0]["env_id"] == "test-env-2" + + +@pytest.mark.asyncio +async def test_trigger_sync_now_idempotent_env_upsert(db_session, _mock_env): + """Verify that calling sync twice doesn't duplicate the Environment row.""" + from src.api.routes.migration import trigger_sync_now + from src.models.mapping import Environment as EnvironmentModel + + cm = _make_sync_config_manager([_mock_env]) + + with patch("src.api.routes.migration.SupersetClient"), \ + patch("src.api.routes.migration.IdMappingService"): + await trigger_sync_now(config_manager=cm, db=db_session, _=None) + await trigger_sync_now(config_manager=cm, db=db_session, _=None) + + env_count = db_session.query(EnvironmentModel).filter_by(id="test-env-1").count() + assert env_count == 1 + + # [/DEF:backend.src.api.routes.__tests__.test_migration_routes:Module] diff --git a/backend/src/api/routes/migration.py b/backend/src/api/routes/migration.py index 43e8ba9..295f414 100644 --- a/backend/src/api/routes/migration.py +++ b/backend/src/api/routes/migration.py @@ -48,7 +48,7 @@ async def get_dashboards( # @POST: Starts the migration task and returns the task ID. # @PARAM: selection (DashboardSelection) - The dashboards to migrate. # @RETURN: Dict - {"task_id": str, "message": str} -@router.post("/execute") +@router.post("/migration/execute") async def execute_migration( selection: DashboardSelection, config_manager=Depends(get_config_manager), @@ -85,22 +85,20 @@ async def execute_migration( # [DEF:get_migration_settings:Function] # @PURPOSE: Get current migration Cron string explicitly. -@router.get("/settings", response_model=Dict[str, str]) +@router.get("/migration/settings", response_model=Dict[str, str]) async def get_migration_settings( config_manager=Depends(get_config_manager), _ = Depends(has_permission("plugin:migration", "READ")) ): with belief_scope("get_migration_settings"): - # For simplicity in MVP, assuming cron expression is stored in config - # default to a valid cron if not set. config = config_manager.get_config() - cron = config.get("migration_sync_cron", "0 2 * * *") + cron = config.settings.migration_sync_cron return {"cron": cron} # [/DEF:get_migration_settings:Function] # [DEF:update_migration_settings:Function] # @PURPOSE: Update migration Cron string. -@router.put("/settings", response_model=Dict[str, str]) +@router.put("/migration/settings", response_model=Dict[str, str]) async def update_migration_settings( payload: Dict[str, str], config_manager=Depends(get_config_manager), @@ -111,20 +109,17 @@ async def update_migration_settings( raise HTTPException(status_code=400, detail="Missing 'cron' field in payload") cron_expr = payload["cron"] - # Basic validation could go here - # In a real system, you'd save this to config and restart the scheduler. - # Here we just blindly patch the in-memory or file config for the MVP. - current_cfg = config_manager.get_config() - current_cfg["migration_sync_cron"] = cron_expr - config_manager.save_config(current_cfg) + config = config_manager.get_config() + config.settings.migration_sync_cron = cron_expr + config_manager.save_config(config) return {"cron": cron_expr, "status": "updated"} # [/DEF:update_migration_settings:Function] # [DEF:get_resource_mappings:Function] # @PURPOSE: Fetch all synchronized object mappings from the database. -@router.get("/mappings-data", response_model=List[Dict[str, Any]]) +@router.get("/migration/mappings-data", response_model=List[Dict[str, Any]]) async def get_resource_mappings( skip: int = Query(0, ge=0), limit: int = Query(100, ge=1, le=1000), @@ -147,4 +142,62 @@ async def get_resource_mappings( return result # [/DEF:get_resource_mappings:Function] +# [DEF:trigger_sync_now:Function] +# @PURPOSE: Triggers an immediate ID synchronization for all environments. +# @PRE: At least one environment must be configured. +# @POST: Environment rows are ensured in DB; sync_environment is called for each. +@router.post("/migration/sync-now", response_model=Dict[str, Any]) +async def trigger_sync_now( + config_manager=Depends(get_config_manager), + db: Session = Depends(get_db), + _ = Depends(has_permission("plugin:migration", "EXECUTE")) +): + with belief_scope("trigger_sync_now"): + from ...core.logger import logger + from ...models.mapping import Environment as EnvironmentModel + + config = config_manager.get_config() + environments = config.environments + + if not environments: + raise HTTPException(status_code=400, detail="No environments configured") + + # Ensure each environment exists in DB (upsert) to satisfy FK constraints + for env in environments: + existing = db.query(EnvironmentModel).filter_by(id=env.id).first() + if not existing: + db_env = EnvironmentModel( + id=env.id, + name=env.name, + url=env.url, + credentials_id=env.id, # Use env.id as credentials reference + ) + db.add(db_env) + logger.info(f"[trigger_sync_now][Action] Created environment row for {env.id}") + else: + existing.name = env.name + existing.url = env.url + db.commit() + + service = IdMappingService(db) + results = {"synced": [], "failed": []} + + for env in environments: + try: + client = SupersetClient(env) + service.sync_environment(env.id, client) + results["synced"].append(env.id) + logger.info(f"[trigger_sync_now][Action] Synced environment {env.id}") + except Exception as e: + results["failed"].append({"env_id": env.id, "error": str(e)}) + logger.error(f"[trigger_sync_now][Error] Failed to sync {env.id}: {e}") + + return { + "status": "completed", + "synced_count": len(results["synced"]), + "failed_count": len(results["failed"]), + "details": results + } +# [/DEF:trigger_sync_now:Function] + # [/DEF:backend.src.api.routes.migration:Module] \ No newline at end of file diff --git a/backend/src/core/config_models.py b/backend/src/core/config_models.py index cb44ea1..5f98231 100755 --- a/backend/src/core/config_models.py +++ b/backend/src/core/config_models.py @@ -3,17 +3,17 @@ # @SEMANTICS: config, models, pydantic # @PURPOSE: Defines the data models for application configuration using Pydantic. # @LAYER: Core -# @RELATION: READS_FROM -> app_configurations (database) +# @RELATION: READS_FROM -> app_configurations (database) # @RELATION: USED_BY -> ConfigManager -from pydantic import BaseModel, Field -from typing import List, Optional -from ..models.storage import StorageConfig -from ..services.llm_prompt_templates import ( - DEFAULT_LLM_ASSISTANT_SETTINGS, - DEFAULT_LLM_PROMPTS, - DEFAULT_LLM_PROVIDER_BINDINGS, -) +from pydantic import BaseModel, Field +from typing import List, Optional +from ..models.storage import StorageConfig +from ..services.llm_prompt_templates import ( + DEFAULT_LLM_ASSISTANT_SETTINGS, + DEFAULT_LLM_PROMPTS, + DEFAULT_LLM_PROVIDER_BINDINGS, +) # [DEF:Schedule:DataClass] # @PURPOSE: Represents a backup schedule configuration. @@ -38,10 +38,10 @@ class Environment(BaseModel): # [DEF:LoggingConfig:DataClass] # @PURPOSE: Defines the configuration for the application's logging system. -class LoggingConfig(BaseModel): - level: str = "INFO" - task_log_level: str = "INFO" # Minimum level for task-specific logs (DEBUG, INFO, WARNING, ERROR) - file_path: Optional[str] = None +class LoggingConfig(BaseModel): + level: str = "INFO" + task_log_level: str = "INFO" # Minimum level for task-specific logs (DEBUG, INFO, WARNING, ERROR) + file_path: Optional[str] = None max_bytes: int = 10 * 1024 * 1024 backup_count: int = 5 enable_belief_state: bool = True @@ -49,25 +49,28 @@ class LoggingConfig(BaseModel): # [DEF:GlobalSettings:DataClass] # @PURPOSE: Represents global application settings. -class GlobalSettings(BaseModel): +class GlobalSettings(BaseModel): storage: StorageConfig = Field(default_factory=StorageConfig) default_environment_id: Optional[str] = None logging: LoggingConfig = Field(default_factory=LoggingConfig) connections: List[dict] = [] - llm: dict = Field( - default_factory=lambda: { - "providers": [], - "default_provider": "", - "prompts": dict(DEFAULT_LLM_PROMPTS), - "provider_bindings": dict(DEFAULT_LLM_PROVIDER_BINDINGS), - **dict(DEFAULT_LLM_ASSISTANT_SETTINGS), - } - ) + llm: dict = Field( + default_factory=lambda: { + "providers": [], + "default_provider": "", + "prompts": dict(DEFAULT_LLM_PROMPTS), + "provider_bindings": dict(DEFAULT_LLM_PROVIDER_BINDINGS), + **dict(DEFAULT_LLM_ASSISTANT_SETTINGS), + } + ) # Task retention settings task_retention_days: int = 30 task_retention_limit: int = 100 pagination_limit: int = 10 + + # Migration sync settings + migration_sync_cron: str = "0 2 * * *" # [/DEF:GlobalSettings:DataClass] # [DEF:AppConfig:DataClass] diff --git a/backend/src/core/superset_client.py b/backend/src/core/superset_client.py index 8f96ba5..afd1f83 100644 --- a/backend/src/core/superset_client.py +++ b/backend/src/core/superset_client.py @@ -829,6 +829,34 @@ class SupersetClient: raise SupersetAPIError(f"Архив {zip_path} не содержит 'metadata.yaml'") # [/DEF:_validate_import_file:Function] + # [DEF:get_all_resources:Function] + # @PURPOSE: Fetches all resources of a given type with id, uuid, and name columns. + # @PARAM: resource_type (str) - One of "chart", "dataset", "dashboard". + # @PRE: Client is authenticated. resource_type is valid. + # @POST: Returns a list of resource dicts with at minimum id, uuid, and name fields. + # @RETURN: List[Dict] + def get_all_resources(self, resource_type: str) -> List[Dict]: + with belief_scope("SupersetClient.get_all_resources", f"type={resource_type}"): + column_map = { + "chart": {"endpoint": "/chart/", "columns": ["id", "uuid", "slice_name"]}, + "dataset": {"endpoint": "/dataset/", "columns": ["id", "uuid", "table_name"]}, + "dashboard": {"endpoint": "/dashboard/", "columns": ["id", "uuid", "slug", "dashboard_title"]}, + } + config = column_map.get(resource_type) + if not config: + app_logger.warning("[get_all_resources][Warning] Unknown resource type: %s", resource_type) + return [] + + query = {"columns": config["columns"]} + validated = self._validate_query_params(query) + data = self._fetch_all_pages( + endpoint=config["endpoint"], + pagination_options={"base_query": validated, "results_field": "result"}, + ) + app_logger.info("[get_all_resources][Exit] Fetched %d %s resources.", len(data), resource_type) + return data + # [/DEF:get_all_resources:Function] + # [/SECTION] # [/DEF:SupersetClient:Class] diff --git a/backend/src/core/task_manager/manager.py b/backend/src/core/task_manager/manager.py index 2704f9e..fa98daa 100644 --- a/backend/src/core/task_manager/manager.py +++ b/backend/src/core/task_manager/manager.py @@ -75,10 +75,9 @@ class TaskManager: # @POST: Logs are batch-written to database every LOG_FLUSH_INTERVAL seconds. def _flusher_loop(self): """Background thread that flushes log buffer to database.""" - with belief_scope("_flusher_loop"): - while not self._flusher_stop_event.is_set(): - self._flush_logs() - self._flusher_stop_event.wait(self.LOG_FLUSH_INTERVAL) + while not self._flusher_stop_event.is_set(): + self._flush_logs() + self._flusher_stop_event.wait(self.LOG_FLUSH_INTERVAL) # [/DEF:_flusher_loop:Function] # [DEF:_flush_logs:Function] @@ -87,24 +86,24 @@ class TaskManager: # @POST: All buffered logs are written to task_logs table. def _flush_logs(self): """Flush all buffered logs to the database.""" - with belief_scope("_flush_logs"): + with self._log_buffer_lock: + task_ids = list(self._log_buffer.keys()) + + for task_id in task_ids: with self._log_buffer_lock: - task_ids = list(self._log_buffer.keys()) + logs = self._log_buffer.pop(task_id, []) - for task_id in task_ids: - with self._log_buffer_lock: - logs = self._log_buffer.pop(task_id, []) - - if logs: - try: - self.log_persistence_service.add_logs(task_id, logs) - except Exception as e: - logger.error(f"Failed to flush logs for task {task_id}: {e}") - # Re-add logs to buffer on failure - with self._log_buffer_lock: - if task_id not in self._log_buffer: - self._log_buffer[task_id] = [] - self._log_buffer[task_id].extend(logs) + if logs: + try: + self.log_persistence_service.add_logs(task_id, logs) + logger.debug(f"Flushed {len(logs)} logs for task {task_id}") + except Exception as e: + logger.error(f"Failed to flush logs for task {task_id}: {e}") + # Re-add logs to buffer on failure + with self._log_buffer_lock: + if task_id not in self._log_buffer: + self._log_buffer[task_id] = [] + self._log_buffer[task_id].extend(logs) # [/DEF:_flush_logs:Function] # [DEF:_flush_task_logs:Function] diff --git a/frontend/src/routes/settings/+page.svelte b/frontend/src/routes/settings/+page.svelte index 59b10d1..db43583 100644 --- a/frontend/src/routes/settings/+page.svelte +++ b/frontend/src/routes/settings/+page.svelte @@ -150,6 +150,7 @@ let displayMappings = []; let isSavingMigration = false; let isLoadingMigration = false; + let isSyncing = false; async function loadMigrationSettings() { isLoadingMigration = true; @@ -183,6 +184,23 @@ } } + async function triggerSyncNow() { + isSyncing = true; + try { + const result = await api.postApi("/migration/sync-now", {}); + addToast( + `Synced ${result.synced_count} environment(s)${result.failed_count > 0 ? `, ${result.failed_count} failed` : ""}`, + result.failed_count > 0 ? "warning" : "success", + ); + await loadMigrationSettings(); + } catch (err) { + console.error("[SettingsPage][Migration] Sync failed:", err); + addToast(err.message || "Sync failed", "error"); + } finally { + isSyncing = false; + } + } + // Handle global settings save (Logging, Storage) async function handleSave() { console.log("[SettingsPage][Action] Saving settings"); @@ -1015,6 +1033,41 @@ > {isSavingMigration ? "Saving..." : "Save"} +