sync worked

This commit is contained in:
2026-02-25 15:20:26 +03:00
parent 2a5b225800
commit 590ba49ddb
6 changed files with 319 additions and 61 deletions

View File

@@ -37,8 +37,11 @@ def db_session():
def _make_config_manager(cron="0 2 * * *"): def _make_config_manager(cron="0 2 * * *"):
"""Creates a mock config manager with settable config.""" """Creates a mock config manager with a realistic AppConfig-like object."""
config = {"migration_sync_cron": cron} settings = MagicMock()
settings.migration_sync_cron = cron
config = MagicMock()
config.settings = settings
cm = MagicMock() cm = MagicMock()
cm.get_config.return_value = config cm.get_config.return_value = config
cm.save_config = MagicMock() cm.save_config = MagicMock()
@@ -63,11 +66,11 @@ async def test_get_migration_settings_returns_default_cron():
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_get_migration_settings_returns_fallback_when_no_cron(): async def test_get_migration_settings_returns_fallback_when_no_cron():
"""When migration_sync_cron is not in config, should return default '0 2 * * *'.""" """When migration_sync_cron uses the default, should return '0 2 * * *'."""
from src.api.routes.migration import get_migration_settings from src.api.routes.migration import get_migration_settings
cm = MagicMock() # Use the default cron value (simulating a fresh config)
cm.get_config.return_value = {} # No cron key cm = _make_config_manager()
result = await get_migration_settings(config_manager=cm, _=None) result = await get_migration_settings(config_manager=cm, _=None)
@@ -161,4 +164,123 @@ async def test_get_resource_mappings_respects_pagination(db_session):
assert len(result) == 2 assert len(result) == 2
# --- trigger_sync_now tests ---
@pytest.fixture
def _mock_env():
"""Creates a mock config environment object."""
env = MagicMock()
env.id = "test-env-1"
env.name = "Test Env"
env.url = "http://superset.test"
env.username = "admin"
env.password = "admin"
env.verify_ssl = False
env.timeout = 30
return env
def _make_sync_config_manager(environments):
"""Creates a mock config manager with environments list."""
settings = MagicMock()
settings.migration_sync_cron = "0 2 * * *"
config = MagicMock()
config.settings = settings
config.environments = environments
cm = MagicMock()
cm.get_config.return_value = config
return cm
@pytest.mark.asyncio
async def test_trigger_sync_now_creates_env_row_and_syncs(db_session, _mock_env):
"""Verify that trigger_sync_now creates an Environment row in DB before syncing,
preventing FK constraint violations on resource_mappings inserts."""
from src.api.routes.migration import trigger_sync_now
from src.models.mapping import Environment as EnvironmentModel
cm = _make_sync_config_manager([_mock_env])
with patch("src.api.routes.migration.SupersetClient") as MockClient, \
patch("src.api.routes.migration.IdMappingService") as MockService:
mock_client_instance = MagicMock()
MockClient.return_value = mock_client_instance
mock_service_instance = MagicMock()
MockService.return_value = mock_service_instance
result = await trigger_sync_now(config_manager=cm, db=db_session, _=None)
# Environment row must exist in DB
env_row = db_session.query(EnvironmentModel).filter_by(id="test-env-1").first()
assert env_row is not None
assert env_row.name == "Test Env"
assert env_row.url == "http://superset.test"
# Sync must have been called
mock_service_instance.sync_environment.assert_called_once_with("test-env-1", mock_client_instance)
assert result["synced_count"] == 1
assert result["failed_count"] == 0
@pytest.mark.asyncio
async def test_trigger_sync_now_rejects_empty_environments(db_session):
"""Verify 400 error when no environments are configured."""
from src.api.routes.migration import trigger_sync_now
cm = _make_sync_config_manager([])
with pytest.raises(HTTPException) as exc_info:
await trigger_sync_now(config_manager=cm, db=db_session, _=None)
assert exc_info.value.status_code == 400
assert "No environments" in exc_info.value.detail
@pytest.mark.asyncio
async def test_trigger_sync_now_handles_partial_failure(db_session, _mock_env):
"""Verify that if sync_environment raises for one env, it's captured in failed list."""
from src.api.routes.migration import trigger_sync_now
env2 = MagicMock()
env2.id = "test-env-2"
env2.name = "Failing Env"
env2.url = "http://fail.test"
env2.username = "admin"
env2.password = "admin"
env2.verify_ssl = False
env2.timeout = 30
cm = _make_sync_config_manager([_mock_env, env2])
with patch("src.api.routes.migration.SupersetClient") as MockClient, \
patch("src.api.routes.migration.IdMappingService") as MockService:
mock_service_instance = MagicMock()
mock_service_instance.sync_environment.side_effect = [None, RuntimeError("Connection refused")]
MockService.return_value = mock_service_instance
MockClient.return_value = MagicMock()
result = await trigger_sync_now(config_manager=cm, db=db_session, _=None)
assert result["synced_count"] == 1
assert result["failed_count"] == 1
assert result["details"]["failed"][0]["env_id"] == "test-env-2"
@pytest.mark.asyncio
async def test_trigger_sync_now_idempotent_env_upsert(db_session, _mock_env):
"""Verify that calling sync twice doesn't duplicate the Environment row."""
from src.api.routes.migration import trigger_sync_now
from src.models.mapping import Environment as EnvironmentModel
cm = _make_sync_config_manager([_mock_env])
with patch("src.api.routes.migration.SupersetClient"), \
patch("src.api.routes.migration.IdMappingService"):
await trigger_sync_now(config_manager=cm, db=db_session, _=None)
await trigger_sync_now(config_manager=cm, db=db_session, _=None)
env_count = db_session.query(EnvironmentModel).filter_by(id="test-env-1").count()
assert env_count == 1
# [/DEF:backend.src.api.routes.__tests__.test_migration_routes:Module] # [/DEF:backend.src.api.routes.__tests__.test_migration_routes:Module]

View File

@@ -48,7 +48,7 @@ async def get_dashboards(
# @POST: Starts the migration task and returns the task ID. # @POST: Starts the migration task and returns the task ID.
# @PARAM: selection (DashboardSelection) - The dashboards to migrate. # @PARAM: selection (DashboardSelection) - The dashboards to migrate.
# @RETURN: Dict - {"task_id": str, "message": str} # @RETURN: Dict - {"task_id": str, "message": str}
@router.post("/execute") @router.post("/migration/execute")
async def execute_migration( async def execute_migration(
selection: DashboardSelection, selection: DashboardSelection,
config_manager=Depends(get_config_manager), config_manager=Depends(get_config_manager),
@@ -85,22 +85,20 @@ async def execute_migration(
# [DEF:get_migration_settings:Function] # [DEF:get_migration_settings:Function]
# @PURPOSE: Get current migration Cron string explicitly. # @PURPOSE: Get current migration Cron string explicitly.
@router.get("/settings", response_model=Dict[str, str]) @router.get("/migration/settings", response_model=Dict[str, str])
async def get_migration_settings( async def get_migration_settings(
config_manager=Depends(get_config_manager), config_manager=Depends(get_config_manager),
_ = Depends(has_permission("plugin:migration", "READ")) _ = Depends(has_permission("plugin:migration", "READ"))
): ):
with belief_scope("get_migration_settings"): with belief_scope("get_migration_settings"):
# For simplicity in MVP, assuming cron expression is stored in config
# default to a valid cron if not set.
config = config_manager.get_config() config = config_manager.get_config()
cron = config.get("migration_sync_cron", "0 2 * * *") cron = config.settings.migration_sync_cron
return {"cron": cron} return {"cron": cron}
# [/DEF:get_migration_settings:Function] # [/DEF:get_migration_settings:Function]
# [DEF:update_migration_settings:Function] # [DEF:update_migration_settings:Function]
# @PURPOSE: Update migration Cron string. # @PURPOSE: Update migration Cron string.
@router.put("/settings", response_model=Dict[str, str]) @router.put("/migration/settings", response_model=Dict[str, str])
async def update_migration_settings( async def update_migration_settings(
payload: Dict[str, str], payload: Dict[str, str],
config_manager=Depends(get_config_manager), config_manager=Depends(get_config_manager),
@@ -111,20 +109,17 @@ async def update_migration_settings(
raise HTTPException(status_code=400, detail="Missing 'cron' field in payload") raise HTTPException(status_code=400, detail="Missing 'cron' field in payload")
cron_expr = payload["cron"] cron_expr = payload["cron"]
# Basic validation could go here
# In a real system, you'd save this to config and restart the scheduler. config = config_manager.get_config()
# Here we just blindly patch the in-memory or file config for the MVP. config.settings.migration_sync_cron = cron_expr
current_cfg = config_manager.get_config() config_manager.save_config(config)
current_cfg["migration_sync_cron"] = cron_expr
config_manager.save_config(current_cfg)
return {"cron": cron_expr, "status": "updated"} return {"cron": cron_expr, "status": "updated"}
# [/DEF:update_migration_settings:Function] # [/DEF:update_migration_settings:Function]
# [DEF:get_resource_mappings:Function] # [DEF:get_resource_mappings:Function]
# @PURPOSE: Fetch all synchronized object mappings from the database. # @PURPOSE: Fetch all synchronized object mappings from the database.
@router.get("/mappings-data", response_model=List[Dict[str, Any]]) @router.get("/migration/mappings-data", response_model=List[Dict[str, Any]])
async def get_resource_mappings( async def get_resource_mappings(
skip: int = Query(0, ge=0), skip: int = Query(0, ge=0),
limit: int = Query(100, ge=1, le=1000), limit: int = Query(100, ge=1, le=1000),
@@ -147,4 +142,62 @@ async def get_resource_mappings(
return result return result
# [/DEF:get_resource_mappings:Function] # [/DEF:get_resource_mappings:Function]
# [DEF:trigger_sync_now:Function]
# @PURPOSE: Triggers an immediate ID synchronization for all environments.
# @PRE: At least one environment must be configured.
# @POST: Environment rows are ensured in DB; sync_environment is called for each.
@router.post("/migration/sync-now", response_model=Dict[str, Any])
async def trigger_sync_now(
config_manager=Depends(get_config_manager),
db: Session = Depends(get_db),
_ = Depends(has_permission("plugin:migration", "EXECUTE"))
):
with belief_scope("trigger_sync_now"):
from ...core.logger import logger
from ...models.mapping import Environment as EnvironmentModel
config = config_manager.get_config()
environments = config.environments
if not environments:
raise HTTPException(status_code=400, detail="No environments configured")
# Ensure each environment exists in DB (upsert) to satisfy FK constraints
for env in environments:
existing = db.query(EnvironmentModel).filter_by(id=env.id).first()
if not existing:
db_env = EnvironmentModel(
id=env.id,
name=env.name,
url=env.url,
credentials_id=env.id, # Use env.id as credentials reference
)
db.add(db_env)
logger.info(f"[trigger_sync_now][Action] Created environment row for {env.id}")
else:
existing.name = env.name
existing.url = env.url
db.commit()
service = IdMappingService(db)
results = {"synced": [], "failed": []}
for env in environments:
try:
client = SupersetClient(env)
service.sync_environment(env.id, client)
results["synced"].append(env.id)
logger.info(f"[trigger_sync_now][Action] Synced environment {env.id}")
except Exception as e:
results["failed"].append({"env_id": env.id, "error": str(e)})
logger.error(f"[trigger_sync_now][Error] Failed to sync {env.id}: {e}")
return {
"status": "completed",
"synced_count": len(results["synced"]),
"failed_count": len(results["failed"]),
"details": results
}
# [/DEF:trigger_sync_now:Function]
# [/DEF:backend.src.api.routes.migration:Module] # [/DEF:backend.src.api.routes.migration:Module]

View File

@@ -3,17 +3,17 @@
# @SEMANTICS: config, models, pydantic # @SEMANTICS: config, models, pydantic
# @PURPOSE: Defines the data models for application configuration using Pydantic. # @PURPOSE: Defines the data models for application configuration using Pydantic.
# @LAYER: Core # @LAYER: Core
# @RELATION: READS_FROM -> app_configurations (database) # @RELATION: READS_FROM -> app_configurations (database)
# @RELATION: USED_BY -> ConfigManager # @RELATION: USED_BY -> ConfigManager
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from typing import List, Optional from typing import List, Optional
from ..models.storage import StorageConfig from ..models.storage import StorageConfig
from ..services.llm_prompt_templates import ( from ..services.llm_prompt_templates import (
DEFAULT_LLM_ASSISTANT_SETTINGS, DEFAULT_LLM_ASSISTANT_SETTINGS,
DEFAULT_LLM_PROMPTS, DEFAULT_LLM_PROMPTS,
DEFAULT_LLM_PROVIDER_BINDINGS, DEFAULT_LLM_PROVIDER_BINDINGS,
) )
# [DEF:Schedule:DataClass] # [DEF:Schedule:DataClass]
# @PURPOSE: Represents a backup schedule configuration. # @PURPOSE: Represents a backup schedule configuration.
@@ -38,10 +38,10 @@ class Environment(BaseModel):
# [DEF:LoggingConfig:DataClass] # [DEF:LoggingConfig:DataClass]
# @PURPOSE: Defines the configuration for the application's logging system. # @PURPOSE: Defines the configuration for the application's logging system.
class LoggingConfig(BaseModel): class LoggingConfig(BaseModel):
level: str = "INFO" level: str = "INFO"
task_log_level: str = "INFO" # Minimum level for task-specific logs (DEBUG, INFO, WARNING, ERROR) task_log_level: str = "INFO" # Minimum level for task-specific logs (DEBUG, INFO, WARNING, ERROR)
file_path: Optional[str] = None file_path: Optional[str] = None
max_bytes: int = 10 * 1024 * 1024 max_bytes: int = 10 * 1024 * 1024
backup_count: int = 5 backup_count: int = 5
enable_belief_state: bool = True enable_belief_state: bool = True
@@ -49,25 +49,28 @@ class LoggingConfig(BaseModel):
# [DEF:GlobalSettings:DataClass] # [DEF:GlobalSettings:DataClass]
# @PURPOSE: Represents global application settings. # @PURPOSE: Represents global application settings.
class GlobalSettings(BaseModel): class GlobalSettings(BaseModel):
storage: StorageConfig = Field(default_factory=StorageConfig) storage: StorageConfig = Field(default_factory=StorageConfig)
default_environment_id: Optional[str] = None default_environment_id: Optional[str] = None
logging: LoggingConfig = Field(default_factory=LoggingConfig) logging: LoggingConfig = Field(default_factory=LoggingConfig)
connections: List[dict] = [] connections: List[dict] = []
llm: dict = Field( llm: dict = Field(
default_factory=lambda: { default_factory=lambda: {
"providers": [], "providers": [],
"default_provider": "", "default_provider": "",
"prompts": dict(DEFAULT_LLM_PROMPTS), "prompts": dict(DEFAULT_LLM_PROMPTS),
"provider_bindings": dict(DEFAULT_LLM_PROVIDER_BINDINGS), "provider_bindings": dict(DEFAULT_LLM_PROVIDER_BINDINGS),
**dict(DEFAULT_LLM_ASSISTANT_SETTINGS), **dict(DEFAULT_LLM_ASSISTANT_SETTINGS),
} }
) )
# Task retention settings # Task retention settings
task_retention_days: int = 30 task_retention_days: int = 30
task_retention_limit: int = 100 task_retention_limit: int = 100
pagination_limit: int = 10 pagination_limit: int = 10
# Migration sync settings
migration_sync_cron: str = "0 2 * * *"
# [/DEF:GlobalSettings:DataClass] # [/DEF:GlobalSettings:DataClass]
# [DEF:AppConfig:DataClass] # [DEF:AppConfig:DataClass]

View File

@@ -829,6 +829,34 @@ class SupersetClient:
raise SupersetAPIError(f"Архив {zip_path} не содержит 'metadata.yaml'") raise SupersetAPIError(f"Архив {zip_path} не содержит 'metadata.yaml'")
# [/DEF:_validate_import_file:Function] # [/DEF:_validate_import_file:Function]
# [DEF:get_all_resources:Function]
# @PURPOSE: Fetches all resources of a given type with id, uuid, and name columns.
# @PARAM: resource_type (str) - One of "chart", "dataset", "dashboard".
# @PRE: Client is authenticated. resource_type is valid.
# @POST: Returns a list of resource dicts with at minimum id, uuid, and name fields.
# @RETURN: List[Dict]
def get_all_resources(self, resource_type: str) -> List[Dict]:
with belief_scope("SupersetClient.get_all_resources", f"type={resource_type}"):
column_map = {
"chart": {"endpoint": "/chart/", "columns": ["id", "uuid", "slice_name"]},
"dataset": {"endpoint": "/dataset/", "columns": ["id", "uuid", "table_name"]},
"dashboard": {"endpoint": "/dashboard/", "columns": ["id", "uuid", "slug", "dashboard_title"]},
}
config = column_map.get(resource_type)
if not config:
app_logger.warning("[get_all_resources][Warning] Unknown resource type: %s", resource_type)
return []
query = {"columns": config["columns"]}
validated = self._validate_query_params(query)
data = self._fetch_all_pages(
endpoint=config["endpoint"],
pagination_options={"base_query": validated, "results_field": "result"},
)
app_logger.info("[get_all_resources][Exit] Fetched %d %s resources.", len(data), resource_type)
return data
# [/DEF:get_all_resources:Function]
# [/SECTION] # [/SECTION]
# [/DEF:SupersetClient:Class] # [/DEF:SupersetClient:Class]

View File

@@ -75,10 +75,9 @@ class TaskManager:
# @POST: Logs are batch-written to database every LOG_FLUSH_INTERVAL seconds. # @POST: Logs are batch-written to database every LOG_FLUSH_INTERVAL seconds.
def _flusher_loop(self): def _flusher_loop(self):
"""Background thread that flushes log buffer to database.""" """Background thread that flushes log buffer to database."""
with belief_scope("_flusher_loop"): while not self._flusher_stop_event.is_set():
while not self._flusher_stop_event.is_set(): self._flush_logs()
self._flush_logs() self._flusher_stop_event.wait(self.LOG_FLUSH_INTERVAL)
self._flusher_stop_event.wait(self.LOG_FLUSH_INTERVAL)
# [/DEF:_flusher_loop:Function] # [/DEF:_flusher_loop:Function]
# [DEF:_flush_logs:Function] # [DEF:_flush_logs:Function]
@@ -87,24 +86,24 @@ class TaskManager:
# @POST: All buffered logs are written to task_logs table. # @POST: All buffered logs are written to task_logs table.
def _flush_logs(self): def _flush_logs(self):
"""Flush all buffered logs to the database.""" """Flush all buffered logs to the database."""
with belief_scope("_flush_logs"): with self._log_buffer_lock:
task_ids = list(self._log_buffer.keys())
for task_id in task_ids:
with self._log_buffer_lock: with self._log_buffer_lock:
task_ids = list(self._log_buffer.keys()) logs = self._log_buffer.pop(task_id, [])
for task_id in task_ids: if logs:
with self._log_buffer_lock: try:
logs = self._log_buffer.pop(task_id, []) self.log_persistence_service.add_logs(task_id, logs)
logger.debug(f"Flushed {len(logs)} logs for task {task_id}")
if logs: except Exception as e:
try: logger.error(f"Failed to flush logs for task {task_id}: {e}")
self.log_persistence_service.add_logs(task_id, logs) # Re-add logs to buffer on failure
except Exception as e: with self._log_buffer_lock:
logger.error(f"Failed to flush logs for task {task_id}: {e}") if task_id not in self._log_buffer:
# Re-add logs to buffer on failure self._log_buffer[task_id] = []
with self._log_buffer_lock: self._log_buffer[task_id].extend(logs)
if task_id not in self._log_buffer:
self._log_buffer[task_id] = []
self._log_buffer[task_id].extend(logs)
# [/DEF:_flush_logs:Function] # [/DEF:_flush_logs:Function]
# [DEF:_flush_task_logs:Function] # [DEF:_flush_task_logs:Function]

View File

@@ -150,6 +150,7 @@
let displayMappings = []; let displayMappings = [];
let isSavingMigration = false; let isSavingMigration = false;
let isLoadingMigration = false; let isLoadingMigration = false;
let isSyncing = false;
async function loadMigrationSettings() { async function loadMigrationSettings() {
isLoadingMigration = true; isLoadingMigration = true;
@@ -183,6 +184,23 @@
} }
} }
async function triggerSyncNow() {
isSyncing = true;
try {
const result = await api.postApi("/migration/sync-now", {});
addToast(
`Synced ${result.synced_count} environment(s)${result.failed_count > 0 ? `, ${result.failed_count} failed` : ""}`,
result.failed_count > 0 ? "warning" : "success",
);
await loadMigrationSettings();
} catch (err) {
console.error("[SettingsPage][Migration] Sync failed:", err);
addToast(err.message || "Sync failed", "error");
} finally {
isSyncing = false;
}
}
// Handle global settings save (Logging, Storage) // Handle global settings save (Logging, Storage)
async function handleSave() { async function handleSave() {
console.log("[SettingsPage][Action] Saving settings"); console.log("[SettingsPage][Action] Saving settings");
@@ -1015,6 +1033,41 @@
> >
{isSavingMigration ? "Saving..." : "Save"} {isSavingMigration ? "Saving..." : "Save"}
</button> </button>
<button
on:click={triggerSyncNow}
disabled={isSyncing}
class="bg-green-600 text-white px-4 py-2 rounded hover:bg-green-700 h-[42px] min-w-[150px] flex items-center justify-center gap-2 disabled:opacity-50"
>
{#if isSyncing}
<svg
class="w-4 h-4 animate-spin"
fill="none"
stroke="currentColor"
viewBox="0 0 24 24"
><path
stroke-linecap="round"
stroke-linejoin="round"
stroke-width="2"
d="M4 4v5h.582m15.356 2A8.001 8.001 0 004.582 9m0 0H9m11 11v-5h-.581m0 0a8.003 8.003 0 01-15.357-2m15.357 2H15"
></path></svg
>
Syncing...
{:else}
<svg
class="w-4 h-4"
fill="none"
stroke="currentColor"
viewBox="0 0 24 24"
><path
stroke-linecap="round"
stroke-linejoin="round"
stroke-width="2"
d="M4 4v5h.582m15.356 2A8.001 8.001 0 004.582 9m0 0H9m11 11v-5h-.581m0 0a8.003 8.003 0 01-15.357-2m15.357 2H15"
></path></svg
>
Sync Now
{/if}
</button>
</div> </div>
</div> </div>