396 lines
19 KiB
Python
Executable File
396 lines
19 KiB
Python
Executable File
# [DEF:MigrationPlugin:Module]
|
|
# @SEMANTICS: migration, superset, automation, dashboard, plugin
|
|
# @PURPOSE: A plugin that provides functionality to migrate Superset dashboards between environments.
|
|
# @LAYER: App
|
|
# @RELATION: IMPLEMENTS -> PluginBase
|
|
# @RELATION: DEPENDS_ON -> superset_tool.client
|
|
# @RELATION: DEPENDS_ON -> superset_tool.utils
|
|
|
|
from typing import Dict, Any, List
|
|
from pathlib import Path
|
|
import zipfile
|
|
import re
|
|
|
|
from ..core.plugin_base import PluginBase
|
|
from ..core.logger import belief_scope
|
|
from ..core.superset_client import SupersetClient
|
|
from ..core.utils.fileio import create_temp_file, update_yamls, create_dashboard_export
|
|
from ..dependencies import get_config_manager
|
|
from ..core.migration_engine import MigrationEngine
|
|
from ..core.database import SessionLocal
|
|
from ..models.mapping import DatabaseMapping, Environment
|
|
|
|
# [DEF:MigrationPlugin:Class]
|
|
# @PURPOSE: Implementation of the migration plugin logic.
|
|
class MigrationPlugin(PluginBase):
|
|
"""
|
|
A plugin to migrate Superset dashboards between environments.
|
|
"""
|
|
|
|
@property
|
|
# [DEF:id:Function]
|
|
# @PURPOSE: Returns the unique identifier for the migration plugin.
|
|
# @PRE: None.
|
|
# @POST: Returns "superset-migration".
|
|
# @RETURN: str - "superset-migration"
|
|
def id(self) -> str:
|
|
with belief_scope("id"):
|
|
return "superset-migration"
|
|
# [/DEF:id:Function]
|
|
|
|
@property
|
|
# [DEF:name:Function]
|
|
# @PURPOSE: Returns the human-readable name of the migration plugin.
|
|
# @PRE: None.
|
|
# @POST: Returns the plugin name.
|
|
# @RETURN: str - Plugin name.
|
|
def name(self) -> str:
|
|
with belief_scope("name"):
|
|
return "Superset Dashboard Migration"
|
|
# [/DEF:name:Function]
|
|
|
|
@property
|
|
# [DEF:description:Function]
|
|
# @PURPOSE: Returns a description of the migration plugin.
|
|
# @PRE: None.
|
|
# @POST: Returns the plugin description.
|
|
# @RETURN: str - Plugin description.
|
|
def description(self) -> str:
|
|
with belief_scope("description"):
|
|
return "Migrates dashboards between Superset environments."
|
|
# [/DEF:description:Function]
|
|
|
|
@property
|
|
# [DEF:version:Function]
|
|
# @PURPOSE: Returns the version of the migration plugin.
|
|
# @PRE: None.
|
|
# @POST: Returns "1.0.0".
|
|
# @RETURN: str - "1.0.0"
|
|
def version(self) -> str:
|
|
with belief_scope("version"):
|
|
return "1.0.0"
|
|
# [/DEF:version:Function]
|
|
|
|
@property
|
|
# [DEF:ui_route:Function]
|
|
# @PURPOSE: Returns the frontend route for the migration plugin.
|
|
# @RETURN: str - "/migration"
|
|
def ui_route(self) -> str:
|
|
with belief_scope("ui_route"):
|
|
return "/migration"
|
|
# [/DEF:ui_route:Function]
|
|
|
|
# [DEF:get_schema:Function]
|
|
# @PURPOSE: Returns the JSON schema for migration plugin parameters.
|
|
# @PRE: Config manager is available.
|
|
# @POST: Returns a valid JSON schema dictionary.
|
|
# @RETURN: Dict[str, Any] - JSON schema.
|
|
def get_schema(self) -> Dict[str, Any]:
|
|
with belief_scope("get_schema"):
|
|
config_manager = get_config_manager()
|
|
envs = [e.name for e in config_manager.get_environments()]
|
|
|
|
return {
|
|
"type": "object",
|
|
"properties": {
|
|
"from_env": {
|
|
"type": "string",
|
|
"title": "Source Environment",
|
|
"description": "The environment to migrate from.",
|
|
"enum": envs if envs else ["dev", "prod"],
|
|
},
|
|
"to_env": {
|
|
"type": "string",
|
|
"title": "Target Environment",
|
|
"description": "The environment to migrate to.",
|
|
"enum": envs if envs else ["dev", "prod"],
|
|
},
|
|
"dashboard_regex": {
|
|
"type": "string",
|
|
"title": "Dashboard Regex",
|
|
"description": "A regular expression to filter dashboards to migrate.",
|
|
},
|
|
"replace_db_config": {
|
|
"type": "boolean",
|
|
"title": "Replace DB Config",
|
|
"description": "Whether to replace the database configuration.",
|
|
"default": False,
|
|
},
|
|
"from_db_id": {
|
|
"type": "integer",
|
|
"title": "Source DB ID",
|
|
"description": "The ID of the source database to replace (if replacing).",
|
|
},
|
|
"to_db_id": {
|
|
"type": "integer",
|
|
"title": "Target DB ID",
|
|
"description": "The ID of the target database to replace with (if replacing).",
|
|
},
|
|
},
|
|
"required": ["from_env", "to_env", "dashboard_regex"],
|
|
}
|
|
# [/DEF:get_schema:Function]
|
|
|
|
# [DEF:execute:Function]
|
|
# @PURPOSE: Executes the dashboard migration logic.
|
|
# @PARAM: params (Dict[str, Any]) - Migration parameters.
|
|
# @PRE: Source and target environments must be configured.
|
|
# @POST: Selected dashboards are migrated.
|
|
async def execute(self, params: Dict[str, Any]):
|
|
with belief_scope("MigrationPlugin.execute"):
|
|
source_env_id = params.get("source_env_id")
|
|
target_env_id = params.get("target_env_id")
|
|
selected_ids = params.get("selected_ids")
|
|
|
|
# Legacy support or alternative params
|
|
from_env_name = params.get("from_env")
|
|
to_env_name = params.get("to_env")
|
|
dashboard_regex = params.get("dashboard_regex")
|
|
|
|
replace_db_config = params.get("replace_db_config", False)
|
|
from_db_id = params.get("from_db_id")
|
|
to_db_id = params.get("to_db_id")
|
|
|
|
# [DEF:MigrationPlugin.execute:Action]
|
|
# @PURPOSE: Execute the migration logic with proper task logging.
|
|
task_id = params.get("_task_id")
|
|
from ..dependencies import get_task_manager
|
|
tm = get_task_manager()
|
|
|
|
class TaskLoggerProxy:
|
|
# [DEF:__init__:Function]
|
|
# @PURPOSE: Initializes the proxy logger.
|
|
# @PRE: None.
|
|
# @POST: Instance is initialized.
|
|
def __init__(self):
|
|
with belief_scope("__init__"):
|
|
# Initialize parent with dummy values since we override methods
|
|
pass
|
|
# [/DEF:__init__:Function]
|
|
|
|
# [DEF:debug:Function]
|
|
# @PURPOSE: Logs a debug message to the task manager.
|
|
# @PRE: msg is a string.
|
|
# @POST: Log is added to task manager if task_id exists.
|
|
def debug(self, msg, *args, extra=None, **kwargs):
|
|
with belief_scope("debug"):
|
|
if task_id: tm._add_log(task_id, "DEBUG", msg, extra or {})
|
|
# [/DEF:debug:Function]
|
|
|
|
# [DEF:info:Function]
|
|
# @PURPOSE: Logs an info message to the task manager.
|
|
# @PRE: msg is a string.
|
|
# @POST: Log is added to task manager if task_id exists.
|
|
def info(self, msg, *args, extra=None, **kwargs):
|
|
with belief_scope("info"):
|
|
if task_id: tm._add_log(task_id, "INFO", msg, extra or {})
|
|
# [/DEF:info:Function]
|
|
|
|
# [DEF:warning:Function]
|
|
# @PURPOSE: Logs a warning message to the task manager.
|
|
# @PRE: msg is a string.
|
|
# @POST: Log is added to task manager if task_id exists.
|
|
def warning(self, msg, *args, extra=None, **kwargs):
|
|
with belief_scope("warning"):
|
|
if task_id: tm._add_log(task_id, "WARNING", msg, extra or {})
|
|
# [/DEF:warning:Function]
|
|
|
|
# [DEF:error:Function]
|
|
# @PURPOSE: Logs an error message to the task manager.
|
|
# @PRE: msg is a string.
|
|
# @POST: Log is added to task manager if task_id exists.
|
|
def error(self, msg, *args, extra=None, **kwargs):
|
|
with belief_scope("error"):
|
|
if task_id: tm._add_log(task_id, "ERROR", msg, extra or {})
|
|
# [/DEF:error:Function]
|
|
|
|
# [DEF:critical:Function]
|
|
# @PURPOSE: Logs a critical message to the task manager.
|
|
# @PRE: msg is a string.
|
|
# @POST: Log is added to task manager if task_id exists.
|
|
def critical(self, msg, *args, extra=None, **kwargs):
|
|
with belief_scope("critical"):
|
|
if task_id: tm._add_log(task_id, "ERROR", msg, extra or {})
|
|
# [/DEF:critical:Function]
|
|
|
|
# [DEF:exception:Function]
|
|
# @PURPOSE: Logs an exception message to the task manager.
|
|
# @PRE: msg is a string.
|
|
# @POST: Log is added to task manager if task_id exists.
|
|
def exception(self, msg, *args, **kwargs):
|
|
with belief_scope("exception"):
|
|
if task_id: tm._add_log(task_id, "ERROR", msg, {"exception": True})
|
|
# [/DEF:exception:Function]
|
|
|
|
logger = TaskLoggerProxy()
|
|
logger.info(f"[MigrationPlugin][Entry] Starting migration task.")
|
|
logger.info(f"[MigrationPlugin][Action] Params: {params}")
|
|
|
|
try:
|
|
with belief_scope("execute"):
|
|
config_manager = get_config_manager()
|
|
environments = config_manager.get_environments()
|
|
|
|
# Resolve environments
|
|
src_env = None
|
|
tgt_env = None
|
|
|
|
if source_env_id:
|
|
src_env = next((e for e in environments if e.id == source_env_id), None)
|
|
elif from_env_name:
|
|
src_env = next((e for e in environments if e.name == from_env_name), None)
|
|
|
|
if target_env_id:
|
|
tgt_env = next((e for e in environments if e.id == target_env_id), None)
|
|
elif to_env_name:
|
|
tgt_env = next((e for e in environments if e.name == to_env_name), None)
|
|
|
|
if not src_env or not tgt_env:
|
|
raise ValueError(f"Could not resolve source or target environment. Source: {source_env_id or from_env_name}, Target: {target_env_id or to_env_name}")
|
|
|
|
from_env_name = src_env.name
|
|
to_env_name = tgt_env.name
|
|
|
|
logger.info(f"[MigrationPlugin][State] Resolved environments: {from_env_name} -> {to_env_name}")
|
|
|
|
from_c = SupersetClient(src_env)
|
|
to_c = SupersetClient(tgt_env)
|
|
|
|
if not from_c or not to_c:
|
|
raise ValueError(f"Clients not initialized for environments: {from_env_name}, {to_env_name}")
|
|
|
|
_, all_dashboards = from_c.get_dashboards()
|
|
|
|
dashboards_to_migrate = []
|
|
if selected_ids:
|
|
dashboards_to_migrate = [d for d in all_dashboards if d["id"] in selected_ids]
|
|
elif dashboard_regex:
|
|
regex_str = str(dashboard_regex)
|
|
dashboards_to_migrate = [
|
|
d for d in all_dashboards if re.search(regex_str, d["dashboard_title"], re.IGNORECASE)
|
|
]
|
|
else:
|
|
logger.warning("[MigrationPlugin][State] No selection criteria provided (selected_ids or dashboard_regex).")
|
|
return
|
|
|
|
if not dashboards_to_migrate:
|
|
logger.warning("[MigrationPlugin][State] No dashboards found matching criteria.")
|
|
return
|
|
|
|
# Fetch mappings from database
|
|
db_mapping = {}
|
|
if replace_db_config:
|
|
db = SessionLocal()
|
|
try:
|
|
# Find environment IDs by name
|
|
src_env = db.query(Environment).filter(Environment.name == from_env_name).first()
|
|
tgt_env = db.query(Environment).filter(Environment.name == to_env_name).first()
|
|
|
|
if src_env and tgt_env:
|
|
mappings = db.query(DatabaseMapping).filter(
|
|
DatabaseMapping.source_env_id == src_env.id,
|
|
DatabaseMapping.target_env_id == tgt_env.id
|
|
).all()
|
|
db_mapping = {m.source_db_uuid: m.target_db_uuid for m in mappings}
|
|
logger.info(f"[MigrationPlugin][State] Loaded {len(db_mapping)} database mappings.")
|
|
finally:
|
|
db.close()
|
|
|
|
engine = MigrationEngine()
|
|
|
|
for dash in dashboards_to_migrate:
|
|
dash_id, dash_slug, title = dash["id"], dash.get("slug"), dash["dashboard_title"]
|
|
|
|
try:
|
|
exported_content, _ = from_c.export_dashboard(dash_id)
|
|
with create_temp_file(content=exported_content, dry_run=True, suffix=".zip", logger=logger) as tmp_zip_path:
|
|
# Always transform to strip databases to avoid password errors
|
|
with create_temp_file(suffix=".zip", dry_run=True, logger=logger) as tmp_new_zip:
|
|
success = engine.transform_zip(str(tmp_zip_path), str(tmp_new_zip), db_mapping, strip_databases=False)
|
|
|
|
if not success and replace_db_config:
|
|
# Signal missing mapping and wait (only if we care about mappings)
|
|
if task_id:
|
|
logger.info(f"[MigrationPlugin][Action] Pausing for missing mapping in task {task_id}")
|
|
# In a real scenario, we'd pass the missing DB info to the frontend
|
|
# For this task, we'll just simulate the wait
|
|
await tm.wait_for_resolution(task_id)
|
|
# After resolution, retry transformation with updated mappings
|
|
# (Mappings would be updated in task.params by resolve_task)
|
|
db = SessionLocal()
|
|
try:
|
|
src_env = db.query(Environment).filter(Environment.name == from_env_name).first()
|
|
tgt_env = db.query(Environment).filter(Environment.name == to_env_name).first()
|
|
mappings = db.query(DatabaseMapping).filter(
|
|
DatabaseMapping.source_env_id == src_env.id,
|
|
DatabaseMapping.target_env_id == tgt_env.id
|
|
).all()
|
|
db_mapping = {m.source_db_uuid: m.target_db_uuid for m in mappings}
|
|
finally:
|
|
db.close()
|
|
success = engine.transform_zip(str(tmp_zip_path), str(tmp_new_zip), db_mapping, strip_databases=False)
|
|
|
|
if success:
|
|
to_c.import_dashboard(file_name=tmp_new_zip, dash_id=dash_id, dash_slug=dash_slug)
|
|
else:
|
|
logger.error(f"[MigrationPlugin][Failure] Failed to transform ZIP for dashboard {title}")
|
|
|
|
logger.info(f"[MigrationPlugin][Success] Dashboard {title} imported.")
|
|
except Exception as exc:
|
|
# Check for password error
|
|
error_msg = str(exc)
|
|
# The error message from Superset is often a JSON string inside a string.
|
|
# We need to robustly detect the password requirement.
|
|
# Typical error: "Error importing dashboard: databases/PostgreSQL.yaml: {'_schema': ['Must provide a password for the database']}"
|
|
|
|
if "Must provide a password for the database" in error_msg:
|
|
# Extract database name
|
|
# Try to find "databases/DBNAME.yaml" pattern
|
|
import re
|
|
db_name = "unknown"
|
|
match = re.search(r"databases/([^.]+)\.yaml", error_msg)
|
|
if match:
|
|
db_name = match.group(1)
|
|
else:
|
|
# Fallback: try to find 'database 'NAME'' pattern
|
|
match_alt = re.search(r"database '([^']+)'", error_msg)
|
|
if match_alt:
|
|
db_name = match_alt.group(1)
|
|
|
|
logger.warning(f"[MigrationPlugin][Action] Detected missing password for database: {db_name}")
|
|
|
|
if task_id:
|
|
input_request = {
|
|
"type": "database_password",
|
|
"databases": [db_name],
|
|
"error_message": error_msg
|
|
}
|
|
tm.await_input(task_id, input_request)
|
|
|
|
# Wait for user input
|
|
await tm.wait_for_input(task_id)
|
|
|
|
# Resume with passwords
|
|
task = tm.get_task(task_id)
|
|
passwords = task.params.get("passwords", {})
|
|
|
|
# Retry import with password
|
|
if passwords:
|
|
logger.info(f"[MigrationPlugin][Action] Retrying import for {title} with provided passwords.")
|
|
to_c.import_dashboard(file_name=tmp_new_zip, dash_id=dash_id, dash_slug=dash_slug, passwords=passwords)
|
|
logger.info(f"[MigrationPlugin][Success] Dashboard {title} imported after password injection.")
|
|
# Clear passwords from params after use for security
|
|
if "passwords" in task.params:
|
|
del task.params["passwords"]
|
|
continue
|
|
|
|
logger.error(f"[MigrationPlugin][Failure] Failed to migrate dashboard {title}: {exc}", exc_info=True)
|
|
|
|
logger.info("[MigrationPlugin][Exit] Migration finished.")
|
|
except Exception as e:
|
|
logger.critical(f"[MigrationPlugin][Failure] Fatal error during migration: {e}", exc_info=True)
|
|
raise e
|
|
# [/DEF:MigrationPlugin.execute:Action]
|
|
# [/DEF:execute:Function]
|
|
# [/DEF:MigrationPlugin:Class]
|
|
# [/DEF:MigrationPlugin:Module] |