feat: restore legacy data and add typed task result views
This commit is contained in:
@@ -4,7 +4,7 @@
|
||||
# @PURPOSE: Defines the FastAPI router for task-related endpoints, allowing clients to create, list, and get the status of tasks.
|
||||
# @LAYER: UI (API)
|
||||
# @RELATION: Depends on the TaskManager. It is included by the main app.
|
||||
from typing import List, Dict, Any, Optional
|
||||
from typing import List, Dict, Any, Optional
|
||||
from fastapi import APIRouter, Depends, HTTPException, status, Query
|
||||
from pydantic import BaseModel
|
||||
from ...core.logger import belief_scope
|
||||
@@ -13,9 +13,15 @@ from ...core.task_manager import TaskManager, Task, TaskStatus, LogEntry
|
||||
from ...core.task_manager.models import LogFilter, LogStats
|
||||
from ...dependencies import get_task_manager, has_permission, get_current_user
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
class CreateTaskRequest(BaseModel):
|
||||
router = APIRouter()
|
||||
|
||||
TASK_TYPE_PLUGIN_MAP = {
|
||||
"llm_validation": ["llm_dashboard_validation"],
|
||||
"backup": ["superset-backup"],
|
||||
"migration": ["superset-migration"],
|
||||
}
|
||||
|
||||
class CreateTaskRequest(BaseModel):
|
||||
plugin_id: str
|
||||
params: Dict[str, Any]
|
||||
|
||||
@@ -79,18 +85,36 @@ async def create_task(
|
||||
# @PRE: task_manager must be available.
|
||||
# @POST: Returns a list of tasks.
|
||||
# @RETURN: List[Task] - List of tasks.
|
||||
async def list_tasks(
|
||||
limit: int = 10,
|
||||
offset: int = 0,
|
||||
status: Optional[TaskStatus] = None,
|
||||
task_manager: TaskManager = Depends(get_task_manager),
|
||||
_ = Depends(has_permission("tasks", "READ"))
|
||||
):
|
||||
"""
|
||||
Retrieve a list of tasks with pagination and optional status filter.
|
||||
"""
|
||||
with belief_scope("list_tasks"):
|
||||
return task_manager.get_tasks(limit=limit, offset=offset, status=status)
|
||||
async def list_tasks(
|
||||
limit: int = 10,
|
||||
offset: int = 0,
|
||||
status_filter: Optional[TaskStatus] = Query(None, alias="status"),
|
||||
task_type: Optional[str] = Query(None, description="Task category: llm_validation, backup, migration"),
|
||||
plugin_id: Optional[List[str]] = Query(None, description="Filter by plugin_id (repeatable query param)"),
|
||||
completed_only: bool = Query(False, description="Return only completed tasks (SUCCESS/FAILED)"),
|
||||
task_manager: TaskManager = Depends(get_task_manager),
|
||||
_ = Depends(has_permission("tasks", "READ"))
|
||||
):
|
||||
"""
|
||||
Retrieve a list of tasks with pagination and optional status filter.
|
||||
"""
|
||||
with belief_scope("list_tasks"):
|
||||
plugin_filters = list(plugin_id) if plugin_id else []
|
||||
if task_type:
|
||||
if task_type not in TASK_TYPE_PLUGIN_MAP:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_400_BAD_REQUEST,
|
||||
detail=f"Unsupported task_type '{task_type}'. Allowed: {', '.join(TASK_TYPE_PLUGIN_MAP.keys())}"
|
||||
)
|
||||
plugin_filters.extend(TASK_TYPE_PLUGIN_MAP[task_type])
|
||||
|
||||
return task_manager.get_tasks(
|
||||
limit=limit,
|
||||
offset=offset,
|
||||
status=status_filter,
|
||||
plugin_ids=plugin_filters or None,
|
||||
completed_only=completed_only
|
||||
)
|
||||
# [/DEF:list_tasks:Function]
|
||||
|
||||
@router.get("/{task_id}", response_model=Task)
|
||||
@@ -276,4 +300,4 @@ async def clear_tasks(
|
||||
task_manager.clear_tasks(status)
|
||||
return
|
||||
# [/DEF:clear_tasks:Function]
|
||||
# [/DEF:TasksRouter:Module]
|
||||
# [/DEF:TasksRouter:Module]
|
||||
|
||||
@@ -8,14 +8,9 @@
|
||||
# @INVARIANT: Uses bcrypt for hashing with standard work factor.
|
||||
|
||||
# [SECTION: IMPORTS]
|
||||
from passlib.context import CryptContext
|
||||
import bcrypt
|
||||
# [/SECTION]
|
||||
|
||||
# [DEF:pwd_context:Variable]
|
||||
# @PURPOSE: Passlib CryptContext for password management.
|
||||
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
|
||||
# [/DEF:pwd_context:Variable]
|
||||
|
||||
# [DEF:verify_password:Function]
|
||||
# @PURPOSE: Verifies a plain password against a hashed password.
|
||||
# @PRE: plain_password is a string, hashed_password is a bcrypt hash.
|
||||
@@ -25,7 +20,15 @@ pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
|
||||
# @PARAM: hashed_password (str) - The stored hash.
|
||||
# @RETURN: bool - Verification result.
|
||||
def verify_password(plain_password: str, hashed_password: str) -> bool:
|
||||
return pwd_context.verify(plain_password, hashed_password)
|
||||
if not hashed_password:
|
||||
return False
|
||||
try:
|
||||
return bcrypt.checkpw(
|
||||
plain_password.encode("utf-8"),
|
||||
hashed_password.encode("utf-8"),
|
||||
)
|
||||
except Exception:
|
||||
return False
|
||||
# [/DEF:verify_password:Function]
|
||||
|
||||
# [DEF:get_password_hash:Function]
|
||||
@@ -36,7 +39,7 @@ def verify_password(plain_password: str, hashed_password: str) -> bool:
|
||||
# @PARAM: password (str) - The password to hash.
|
||||
# @RETURN: str - The generated hash.
|
||||
def get_password_hash(password: str) -> str:
|
||||
return pwd_context.hash(password)
|
||||
return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
|
||||
# [/DEF:get_password_hash:Function]
|
||||
|
||||
# [/DEF:backend.src.core.auth.security:Module]
|
||||
# [/DEF:backend.src.core.auth.security:Module]
|
||||
|
||||
@@ -17,6 +17,7 @@ from ..models.mapping import Base
|
||||
from ..models import task as _task_models # noqa: F401
|
||||
from ..models import auth as _auth_models # noqa: F401
|
||||
from ..models import config as _config_models # noqa: F401
|
||||
from ..models import llm as _llm_models # noqa: F401
|
||||
from .logger import belief_scope
|
||||
from .auth.config import auth_config
|
||||
import os
|
||||
|
||||
@@ -312,11 +312,23 @@ class TaskManager:
|
||||
# @PARAM: offset (int) - Number of tasks to skip.
|
||||
# @PARAM: status (Optional[TaskStatus]) - Filter by task status.
|
||||
# @RETURN: List[Task] - List of tasks matching criteria.
|
||||
def get_tasks(self, limit: int = 10, offset: int = 0, status: Optional[TaskStatus] = None) -> List[Task]:
|
||||
def get_tasks(
|
||||
self,
|
||||
limit: int = 10,
|
||||
offset: int = 0,
|
||||
status: Optional[TaskStatus] = None,
|
||||
plugin_ids: Optional[List[str]] = None,
|
||||
completed_only: bool = False
|
||||
) -> List[Task]:
|
||||
with belief_scope("TaskManager.get_tasks"):
|
||||
tasks = list(self.tasks.values())
|
||||
if status:
|
||||
tasks = [t for t in tasks if t.status == status]
|
||||
if plugin_ids:
|
||||
plugin_id_set = set(plugin_ids)
|
||||
tasks = [t for t in tasks if t.plugin_id in plugin_id_set]
|
||||
if completed_only:
|
||||
tasks = [t for t in tasks if t.status in [TaskStatus.SUCCESS, TaskStatus.FAILED]]
|
||||
# Sort by start_time descending (most recent first)
|
||||
tasks.sort(key=lambda t: t.started_at or datetime.min, reverse=True)
|
||||
return tasks[offset:offset + limit]
|
||||
@@ -568,4 +580,4 @@ class TaskManager:
|
||||
# [/DEF:clear_tasks:Function]
|
||||
|
||||
# [/DEF:TaskManager:Class]
|
||||
# [/DEF:TaskManagerModule:Module]
|
||||
# [/DEF:TaskManagerModule:Module]
|
||||
|
||||
@@ -154,10 +154,10 @@ class BackupPlugin(PluginBase):
|
||||
|
||||
log.info(f"Starting backup for environment: {env}")
|
||||
|
||||
try:
|
||||
config_manager = get_config_manager()
|
||||
if not config_manager.has_environments():
|
||||
raise ValueError("No Superset environments configured. Please add an environment in Settings.")
|
||||
try:
|
||||
config_manager = get_config_manager()
|
||||
if not config_manager.has_environments():
|
||||
raise ValueError("No Superset environments configured. Please add an environment in Settings.")
|
||||
|
||||
env_config = config_manager.get_environment(env)
|
||||
if not env_config:
|
||||
@@ -180,16 +180,27 @@ class BackupPlugin(PluginBase):
|
||||
superset_log.info("No dashboard filter applied - backing up all dashboards")
|
||||
dashboard_meta = all_dashboard_meta
|
||||
|
||||
if dashboard_count == 0:
|
||||
log.info("No dashboards to back up")
|
||||
return
|
||||
|
||||
total = len(dashboard_meta)
|
||||
for idx, db in enumerate(dashboard_meta, 1):
|
||||
dashboard_id = db.get('id')
|
||||
dashboard_title = db.get('dashboard_title', 'Unknown Dashboard')
|
||||
if not dashboard_id:
|
||||
continue
|
||||
if dashboard_count == 0:
|
||||
log.info("No dashboards to back up")
|
||||
return {
|
||||
"status": "NO_DASHBOARDS",
|
||||
"environment": env,
|
||||
"backup_root": str(backup_path / env.upper()),
|
||||
"total_dashboards": 0,
|
||||
"backed_up_dashboards": 0,
|
||||
"failed_dashboards": 0,
|
||||
"dashboards": [],
|
||||
"failures": []
|
||||
}
|
||||
|
||||
total = len(dashboard_meta)
|
||||
backed_up_dashboards = []
|
||||
failed_dashboards = []
|
||||
for idx, db in enumerate(dashboard_meta, 1):
|
||||
dashboard_id = db.get('id')
|
||||
dashboard_title = db.get('dashboard_title', 'Unknown Dashboard')
|
||||
if not dashboard_id:
|
||||
continue
|
||||
|
||||
# Report progress
|
||||
progress_pct = (idx / total) * 100
|
||||
@@ -210,21 +221,41 @@ class BackupPlugin(PluginBase):
|
||||
unpack=False
|
||||
)
|
||||
|
||||
archive_exports(str(dashboard_dir), policy=RetentionPolicy())
|
||||
storage_log.debug(f"Archived dashboard: {dashboard_title}")
|
||||
|
||||
except (SupersetAPIError, RequestException, IOError, OSError) as db_error:
|
||||
log.error(f"Failed to export dashboard {dashboard_title} (ID: {dashboard_id}): {db_error}")
|
||||
continue
|
||||
|
||||
consolidate_archive_folders(backup_path / env.upper())
|
||||
remove_empty_directories(str(backup_path / env.upper()))
|
||||
|
||||
log.info(f"Backup completed successfully for {env}")
|
||||
|
||||
except (RequestException, IOError, KeyError) as e:
|
||||
log.error(f"Fatal error during backup for {env}: {e}")
|
||||
raise e
|
||||
archive_exports(str(dashboard_dir), policy=RetentionPolicy())
|
||||
storage_log.debug(f"Archived dashboard: {dashboard_title}")
|
||||
backed_up_dashboards.append({
|
||||
"id": dashboard_id,
|
||||
"title": dashboard_title,
|
||||
"path": str(dashboard_dir)
|
||||
})
|
||||
|
||||
except (SupersetAPIError, RequestException, IOError, OSError) as db_error:
|
||||
log.error(f"Failed to export dashboard {dashboard_title} (ID: {dashboard_id}): {db_error}")
|
||||
failed_dashboards.append({
|
||||
"id": dashboard_id,
|
||||
"title": dashboard_title,
|
||||
"error": str(db_error)
|
||||
})
|
||||
continue
|
||||
|
||||
consolidate_archive_folders(backup_path / env.upper())
|
||||
remove_empty_directories(str(backup_path / env.upper()))
|
||||
|
||||
log.info(f"Backup completed successfully for {env}")
|
||||
return {
|
||||
"status": "SUCCESS" if not failed_dashboards else "PARTIAL_SUCCESS",
|
||||
"environment": env,
|
||||
"backup_root": str(backup_path / env.upper()),
|
||||
"total_dashboards": total,
|
||||
"backed_up_dashboards": len(backed_up_dashboards),
|
||||
"failed_dashboards": len(failed_dashboards),
|
||||
"dashboards": backed_up_dashboards,
|
||||
"failures": failed_dashboards
|
||||
}
|
||||
|
||||
except (RequestException, IOError, KeyError) as e:
|
||||
log.error(f"Fatal error during backup for {env}: {e}")
|
||||
raise e
|
||||
# [/DEF:execute:Function]
|
||||
# [/DEF:BackupPlugin:Class]
|
||||
# [/DEF:BackupPlugin:Module]
|
||||
# [/DEF:BackupPlugin:Module]
|
||||
|
||||
@@ -165,11 +165,11 @@ class MigrationPlugin(PluginBase):
|
||||
superset_log = log.with_source("superset_api") if context else log
|
||||
migration_log = log.with_source("migration") if context else log
|
||||
|
||||
log.info("Starting migration task.")
|
||||
log.debug(f"Params: {params}")
|
||||
|
||||
try:
|
||||
with belief_scope("execute"):
|
||||
log.info("Starting migration task.")
|
||||
log.debug(f"Params: {params}")
|
||||
|
||||
try:
|
||||
with belief_scope("execute"):
|
||||
config_manager = get_config_manager()
|
||||
environments = config_manager.get_environments()
|
||||
|
||||
@@ -192,11 +192,20 @@ class MigrationPlugin(PluginBase):
|
||||
|
||||
from_env_name = src_env.name
|
||||
to_env_name = tgt_env.name
|
||||
|
||||
log.info(f"Resolved environments: {from_env_name} -> {to_env_name}")
|
||||
|
||||
from_c = SupersetClient(src_env)
|
||||
to_c = SupersetClient(tgt_env)
|
||||
|
||||
log.info(f"Resolved environments: {from_env_name} -> {to_env_name}")
|
||||
migration_result = {
|
||||
"status": "SUCCESS",
|
||||
"source_environment": from_env_name,
|
||||
"target_environment": to_env_name,
|
||||
"selected_dashboards": 0,
|
||||
"migrated_dashboards": [],
|
||||
"failed_dashboards": [],
|
||||
"mapping_count": 0
|
||||
}
|
||||
|
||||
from_c = SupersetClient(src_env)
|
||||
to_c = SupersetClient(tgt_env)
|
||||
|
||||
if not from_c or not to_c:
|
||||
raise ValueError(f"Clients not initialized for environments: {from_env_name}, {to_env_name}")
|
||||
@@ -204,20 +213,24 @@ class MigrationPlugin(PluginBase):
|
||||
_, all_dashboards = from_c.get_dashboards()
|
||||
|
||||
dashboards_to_migrate = []
|
||||
if selected_ids:
|
||||
dashboards_to_migrate = [d for d in all_dashboards if d["id"] in selected_ids]
|
||||
elif dashboard_regex:
|
||||
regex_str = str(dashboard_regex)
|
||||
dashboards_to_migrate = [
|
||||
if selected_ids:
|
||||
dashboards_to_migrate = [d for d in all_dashboards if d["id"] in selected_ids]
|
||||
elif dashboard_regex:
|
||||
regex_str = str(dashboard_regex)
|
||||
dashboards_to_migrate = [
|
||||
d for d in all_dashboards if re.search(regex_str, d["dashboard_title"], re.IGNORECASE)
|
||||
]
|
||||
else:
|
||||
log.warning("No selection criteria provided (selected_ids or dashboard_regex).")
|
||||
return
|
||||
|
||||
if not dashboards_to_migrate:
|
||||
log.warning("No dashboards found matching criteria.")
|
||||
return
|
||||
else:
|
||||
log.warning("No selection criteria provided (selected_ids or dashboard_regex).")
|
||||
migration_result["status"] = "NO_SELECTION"
|
||||
return migration_result
|
||||
|
||||
if not dashboards_to_migrate:
|
||||
log.warning("No dashboards found matching criteria.")
|
||||
migration_result["status"] = "NO_MATCHES"
|
||||
return migration_result
|
||||
|
||||
migration_result["selected_dashboards"] = len(dashboards_to_migrate)
|
||||
|
||||
# Get mappings from params
|
||||
db_mapping = params.get("db_mappings", {})
|
||||
@@ -238,17 +251,18 @@ class MigrationPlugin(PluginBase):
|
||||
DatabaseMapping.target_env_id == tgt_env_db.id
|
||||
).all()
|
||||
# Provided mappings override stored ones
|
||||
stored_map_dict = {m.source_db_uuid: m.target_db_uuid for m in stored_mappings}
|
||||
stored_map_dict.update(db_mapping)
|
||||
db_mapping = stored_map_dict
|
||||
log.info(f"Loaded {len(stored_mappings)} database mappings from database.")
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
engine = MigrationEngine()
|
||||
|
||||
for dash in dashboards_to_migrate:
|
||||
dash_id, dash_slug, title = dash["id"], dash.get("slug"), dash["dashboard_title"]
|
||||
stored_map_dict = {m.source_db_uuid: m.target_db_uuid for m in stored_mappings}
|
||||
stored_map_dict.update(db_mapping)
|
||||
db_mapping = stored_map_dict
|
||||
log.info(f"Loaded {len(stored_mappings)} database mappings from database.")
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
migration_result["mapping_count"] = len(db_mapping)
|
||||
engine = MigrationEngine()
|
||||
|
||||
for dash in dashboards_to_migrate:
|
||||
dash_id, dash_slug, title = dash["id"], dash.get("slug"), dash["dashboard_title"]
|
||||
|
||||
try:
|
||||
exported_content, _ = from_c.export_dashboard(dash_id)
|
||||
@@ -279,13 +293,22 @@ class MigrationPlugin(PluginBase):
|
||||
db.close()
|
||||
success = engine.transform_zip(str(tmp_zip_path), str(tmp_new_zip), db_mapping, strip_databases=False)
|
||||
|
||||
if success:
|
||||
to_c.import_dashboard(file_name=tmp_new_zip, dash_id=dash_id, dash_slug=dash_slug)
|
||||
else:
|
||||
migration_log.error(f"Failed to transform ZIP for dashboard {title}")
|
||||
|
||||
superset_log.info(f"Dashboard {title} imported.")
|
||||
except Exception as exc:
|
||||
if success:
|
||||
to_c.import_dashboard(file_name=tmp_new_zip, dash_id=dash_id, dash_slug=dash_slug)
|
||||
migration_result["migrated_dashboards"].append({
|
||||
"id": dash_id,
|
||||
"title": title
|
||||
})
|
||||
else:
|
||||
migration_log.error(f"Failed to transform ZIP for dashboard {title}")
|
||||
migration_result["failed_dashboards"].append({
|
||||
"id": dash_id,
|
||||
"title": title,
|
||||
"error": "Failed to transform ZIP"
|
||||
})
|
||||
|
||||
superset_log.info(f"Dashboard {title} imported.")
|
||||
except Exception as exc:
|
||||
# Check for password error
|
||||
error_msg = str(exc)
|
||||
# The error message from Superset is often a JSON string inside a string.
|
||||
@@ -324,22 +347,34 @@ class MigrationPlugin(PluginBase):
|
||||
passwords = task.params.get("passwords", {})
|
||||
|
||||
# Retry import with password
|
||||
if passwords:
|
||||
app_logger.info(f"[MigrationPlugin][Action] Retrying import for {title} with provided passwords.")
|
||||
to_c.import_dashboard(file_name=tmp_new_zip, dash_id=dash_id, dash_slug=dash_slug, passwords=passwords)
|
||||
app_logger.info(f"[MigrationPlugin][Success] Dashboard {title} imported after password injection.")
|
||||
# Clear passwords from params after use for security
|
||||
if "passwords" in task.params:
|
||||
del task.params["passwords"]
|
||||
continue
|
||||
|
||||
app_logger.error(f"[MigrationPlugin][Failure] Failed to migrate dashboard {title}: {exc}", exc_info=True)
|
||||
|
||||
app_logger.info("[MigrationPlugin][Exit] Migration finished.")
|
||||
except Exception as e:
|
||||
app_logger.critical(f"[MigrationPlugin][Failure] Fatal error during migration: {e}", exc_info=True)
|
||||
raise e
|
||||
if passwords:
|
||||
app_logger.info(f"[MigrationPlugin][Action] Retrying import for {title} with provided passwords.")
|
||||
to_c.import_dashboard(file_name=tmp_new_zip, dash_id=dash_id, dash_slug=dash_slug, passwords=passwords)
|
||||
app_logger.info(f"[MigrationPlugin][Success] Dashboard {title} imported after password injection.")
|
||||
migration_result["migrated_dashboards"].append({
|
||||
"id": dash_id,
|
||||
"title": title
|
||||
})
|
||||
# Clear passwords from params after use for security
|
||||
if "passwords" in task.params:
|
||||
del task.params["passwords"]
|
||||
continue
|
||||
|
||||
app_logger.error(f"[MigrationPlugin][Failure] Failed to migrate dashboard {title}: {exc}", exc_info=True)
|
||||
migration_result["failed_dashboards"].append({
|
||||
"id": dash_id,
|
||||
"title": title,
|
||||
"error": str(exc)
|
||||
})
|
||||
|
||||
app_logger.info("[MigrationPlugin][Exit] Migration finished.")
|
||||
if migration_result["failed_dashboards"]:
|
||||
migration_result["status"] = "PARTIAL_SUCCESS"
|
||||
return migration_result
|
||||
except Exception as e:
|
||||
app_logger.critical(f"[MigrationPlugin][Failure] Fatal error during migration: {e}", exc_info=True)
|
||||
raise e
|
||||
# [/DEF:MigrationPlugin.execute:Action]
|
||||
# [/DEF:execute:Function]
|
||||
# [/DEF:MigrationPlugin:Class]
|
||||
# [/DEF:MigrationPlugin:Module]
|
||||
# [/DEF:MigrationPlugin:Module]
|
||||
|
||||
Reference in New Issue
Block a user