mappings+migrate

This commit is contained in:
2025-12-27 10:16:41 +03:00
parent 3d75a21127
commit 6962a78112
19 changed files with 925 additions and 143 deletions

View File

@@ -15,6 +15,8 @@ import shutil
import tempfile
from pathlib import Path
from typing import Dict
from .logger import logger, belief_scope
import yaml
# [/SECTION]
# [DEF:MigrationEngine:Class]
@@ -26,37 +28,51 @@ class MigrationEngine:
# @PARAM: zip_path (str) - Path to the source ZIP file.
# @PARAM: output_path (str) - Path where the transformed ZIP will be saved.
# @PARAM: db_mapping (Dict[str, str]) - Mapping of source UUID to target UUID.
# @PARAM: strip_databases (bool) - Whether to remove the databases directory from the archive.
# @RETURN: bool - True if successful.
def transform_zip(self, zip_path: str, output_path: str, db_mapping: Dict[str, str]) -> bool:
def transform_zip(self, zip_path: str, output_path: str, db_mapping: Dict[str, str], strip_databases: bool = True) -> bool:
"""
Transform a Superset export ZIP by replacing database UUIDs.
"""
with tempfile.TemporaryDirectory() as temp_dir_str:
temp_dir = Path(temp_dir_str)
with belief_scope("MigrationEngine.transform_zip"):
with tempfile.TemporaryDirectory() as temp_dir_str:
temp_dir = Path(temp_dir_str)
try:
# 1. Extract
with zipfile.ZipFile(zip_path, 'r') as zf:
zf.extractall(temp_dir)
try:
# 1. Extract
logger.info(f"[MigrationEngine.transform_zip][Action] Extracting ZIP: {zip_path}")
with zipfile.ZipFile(zip_path, 'r') as zf:
zf.extractall(temp_dir)
# 2. Transform YAMLs
# Datasets are usually in datasets/*.yaml
dataset_files = list(temp_dir.glob("**/datasets/*.yaml"))
for ds_file in dataset_files:
self._transform_yaml(ds_file, db_mapping)
# 2. Transform YAMLs
# Datasets are usually in datasets/*.yaml
dataset_files = list(temp_dir.glob("**/datasets/**/*.yaml")) + list(temp_dir.glob("**/datasets/*.yaml"))
dataset_files = list(set(dataset_files))
logger.info(f"[MigrationEngine.transform_zip][State] Found {len(dataset_files)} dataset files.")
for ds_file in dataset_files:
logger.info(f"[MigrationEngine.transform_zip][Action] Transforming dataset: {ds_file}")
self._transform_yaml(ds_file, db_mapping)
# 3. Re-package
with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zf:
for root, dirs, files in os.walk(temp_dir):
for file in files:
file_path = Path(root) / file
arcname = file_path.relative_to(temp_dir)
zf.write(file_path, arcname)
return True
except Exception as e:
print(f"Error transforming ZIP: {e}")
return False
# 3. Re-package
logger.info(f"[MigrationEngine.transform_zip][Action] Re-packaging ZIP to: {output_path} (strip_databases={strip_databases})")
with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zf:
for root, dirs, files in os.walk(temp_dir):
rel_root = Path(root).relative_to(temp_dir)
if strip_databases and "databases" in rel_root.parts:
logger.info(f"[MigrationEngine.transform_zip][Action] Skipping file in databases directory: {rel_root}")
continue
for file in files:
file_path = Path(root) / file
arcname = file_path.relative_to(temp_dir)
zf.write(file_path, arcname)
return True
except Exception as e:
logger.error(f"[MigrationEngine.transform_zip][Coherence:Failed] Error transforming ZIP: {e}")
return False
# [DEF:MigrationEngine._transform_yaml:Function]
# @PURPOSE: Replaces database_uuid in a single YAML file.

View File

@@ -47,12 +47,17 @@ class PluginLoader:
Loads a single Python module and extracts PluginBase subclasses.
"""
# Try to determine the correct package prefix based on how the app is running
if "backend.src" in __name__:
# For standalone execution, we need to handle the import differently
if __name__ == "__main__" or "test" in __name__:
# When running as standalone or in tests, use relative import
package_name = f"plugins.{module_name}"
elif "backend.src" in __name__:
package_prefix = "backend.src.plugins"
package_name = f"{package_prefix}.{module_name}"
else:
package_prefix = "src.plugins"
package_name = f"{package_prefix}.{module_name}"
package_name = f"{package_prefix}.{module_name}"
# print(f"DEBUG: Loading plugin {module_name} as {package_name}")
spec = importlib.util.spec_from_file_location(package_name, file_path)
if spec is None or spec.loader is None:
@@ -106,9 +111,11 @@ class PluginLoader:
# validate(instance={}, schema=schema)
self._plugins[plugin_id] = plugin_instance
self._plugin_configs[plugin_id] = plugin_config
print(f"Plugin '{plugin_instance.name}' (ID: {plugin_id}) loaded successfully.") # Replace with proper logging
from ..core.logger import logger
logger.info(f"Plugin '{plugin_instance.name}' (ID: {plugin_id}) loaded successfully.")
except Exception as e:
print(f"Error validating plugin '{plugin_instance.name}' (ID: {plugin_id}): {e}") # Replace with proper logging
from ..core.logger import logger
logger.error(f"Error validating plugin '{plugin_instance.name}' (ID: {plugin_id}): {e}")
def get_plugin(self, plugin_id: str) -> Optional[PluginBase]:

View File

@@ -52,6 +52,32 @@ class SupersetClient(BaseSupersetClient):
return databases[0] if databases else None
# [/DEF:SupersetClient.get_database_by_uuid]
# [DEF:SupersetClient.get_dashboards_summary:Function]
# @PURPOSE: Fetches dashboard metadata optimized for the grid.
# @POST: Returns a list of dashboard dictionaries.
# @RETURN: List[Dict]
def get_dashboards_summary(self) -> List[Dict]:
"""
Fetches dashboard metadata optimized for the grid.
Returns a list of dictionaries mapped to DashboardMetadata fields.
"""
query = {
"columns": ["id", "dashboard_title", "changed_on_utc", "published"]
}
_, dashboards = self.get_dashboards(query=query)
# Map fields to DashboardMetadata schema
result = []
for dash in dashboards:
result.append({
"id": dash.get("id"),
"title": dash.get("dashboard_title"),
"last_modified": dash.get("changed_on_utc"),
"status": "published" if dash.get("published") else "draft"
})
return result
# [/DEF:SupersetClient.get_dashboards_summary]
# [/DEF:SupersetClient]
# [/DEF:backend.src.core.superset_client]

View File

@@ -64,7 +64,10 @@ class TaskManager:
self.tasks: Dict[str, Task] = {}
self.subscribers: Dict[str, List[asyncio.Queue]] = {}
self.executor = ThreadPoolExecutor(max_workers=5) # For CPU-bound plugin execution
self.loop = asyncio.get_event_loop()
try:
self.loop = asyncio.get_running_loop()
except RuntimeError:
self.loop = asyncio.get_event_loop()
self.task_futures: Dict[str, asyncio.Future] = {}
# [/DEF]
@@ -72,17 +75,25 @@ class TaskManager:
"""
Creates and queues a new task for execution.
"""
from ..core.logger import logger
logger.info(f"TaskManager: Creating task for plugin '{plugin_id}' with params: {params}")
if not self.plugin_loader.has_plugin(plugin_id):
logger.error(f"TaskManager: Plugin with ID '{plugin_id}' not found.")
raise ValueError(f"Plugin with ID '{plugin_id}' not found.")
plugin = self.plugin_loader.get_plugin(plugin_id)
logger.info(f"TaskManager: Found plugin '{plugin.name}' for task creation")
# Validate params against plugin schema (this will be done at a higher level, e.g., API route)
# For now, a basic check
if not isinstance(params, dict):
logger.error("TaskManager: Task parameters must be a dictionary.")
raise ValueError("Task parameters must be a dictionary.")
task = Task(plugin_id=plugin_id, params=params, user_id=user_id)
self.tasks[task.id] = task
logger.info(f"TaskManager: Task {task.id} created and scheduled for execution")
self.loop.create_task(self._run_task(task.id)) # Schedule task for execution
return task
@@ -90,9 +101,11 @@ class TaskManager:
"""
Internal method to execute a task.
"""
from ..core.logger import logger
task = self.tasks[task_id]
plugin = self.plugin_loader.get_plugin(task.plugin_id)
logger.info(f"TaskManager: Starting execution of task {task_id} for plugin '{plugin.name}'")
task.status = TaskStatus.RUNNING
task.started_at = datetime.utcnow()
self._add_log(task_id, "INFO", f"Task started for plugin '{plugin.name}'")
@@ -103,17 +116,27 @@ class TaskManager:
# If the plugin's execute method is already async, this can be simplified.
# Pass task_id to plugin so it can signal pause
params = {**task.params, "_task_id": task_id}
await self.loop.run_in_executor(
self.executor,
lambda: asyncio.run(plugin.execute(params)) if asyncio.iscoroutinefunction(plugin.execute) else plugin.execute(params)
)
logger.info(f"TaskManager: Executing plugin '{plugin.name}' with params: {params}")
if asyncio.iscoroutinefunction(plugin.execute):
logger.info(f"TaskManager: Executing async plugin '{plugin.name}'")
await plugin.execute(params)
else:
logger.info(f"TaskManager: Executing sync plugin '{plugin.name}' in executor")
await self.loop.run_in_executor(
self.executor,
plugin.execute,
params
)
logger.info(f"TaskManager: Task {task_id} completed successfully for plugin '{plugin.name}'")
task.status = TaskStatus.SUCCESS
self._add_log(task_id, "INFO", f"Task completed successfully for plugin '{plugin.name}'")
except Exception as e:
logger.error(f"TaskManager: Task {task_id} failed for plugin '{plugin.name}': {e}")
task.status = TaskStatus.FAILED
self._add_log(task_id, "ERROR", f"Task failed: {e}", {"error_type": type(e).__name__})
finally:
task.finished_at = datetime.utcnow()
logger.info(f"TaskManager: Task {task_id} execution finished with status: {task.status}")
# In a real system, you might notify clients via WebSocket here
async def resolve_task(self, task_id: str, resolution_params: Dict[str, Any]):