dry run migration

This commit is contained in:
2026-02-27 20:48:18 +03:00
parent 149d230426
commit 8fa951fc93
16 changed files with 12141 additions and 2051 deletions

1602
.ai/MODULE_MAP.md Normal file

File diff suppressed because it is too large Load Diff

4523
.ai/PROJECT_MAP.md Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -3,76 +3,28 @@
# @SEMANTICS: Finance, ACID, Transfer, Ledger
# @PURPOSE: Core banking transaction processor with ACID guarantees.
# @LAYER: Domain (Core)
# @RELATION: DEPENDS_ON -> [DEF:Infra:PostgresDB]
# @RELATION: DEPENDS_ON -> [DEF:Infra:AuditLog]
# @RELATION: DEPENDS_ON ->[DEF:Infra:PostgresDB]
#
# @INVARIANT: Total system balance must remain constant (Double-Entry Bookkeeping).
# @INVARIANT: Negative transfers are strictly forbidden.
# @INVARIANT: No partial commit must occur under failure (ACID Atomicity).
# @TEST_CONTRACT: TransferInput ->
# {
# required_fields: {
# sender_id: str,
# receiver_id: str,
# amount: Decimal
# },
# invariants: [
# "amount > 0",
# "sender_id != receiver_id"
# ],
# constraints: [
# "sender must exist",
# "receiver must exist"
# ]
# }
# --- Test Specifications (The "What" and "Why", not the "Data") ---
# @TEST_CONTRACT: Input -> TransferInputDTO, Output -> TransferResultDTO
# @TEST_CONTRACT: TransferResult ->
# {
# required_fields: {
# tx_id: str,
# status: str,
# new_balance: Decimal
# },
# invariants: [
# "status == COMPLETED implies balance mutation occurred"
# ]
# }
# Happy Path
# @TEST_SCENARIO: sufficient_funds -> Returns COMPLETED, balances updated.
# @TEST_FIXTURE: sufficient_funds -> file:./__tests__/fixtures/transfers.json#happy_path
# @TEST_FIXTURE: sufficient_funds ->
# {
# sender_balance: 500.00,
# receiver_balance: 100.00,
# amount: 100.00
# }
# Edge Cases (CRITICAL)
# @TEST_SCENARIO: insufficient_funds -> Throws BusinessRuleViolation("INSUFFICIENT_FUNDS").
# @TEST_SCENARIO: negative_amount -> Throws BusinessRuleViolation("Transfer amount must be positive.").
# @TEST_SCENARIO: self_transfer -> Throws BusinessRuleViolation("Cannot transfer to self.").
# @TEST_SCENARIO: audit_failure -> Throws RuntimeError("TRANSACTION_ABORTED").
# @TEST_SCENARIO: concurrency_conflict -> Throws DBTransactionError.
# @TEST_EDGE: insufficient_funds ->
# {
# sender_balance: 50.00,
# receiver_balance: 100.00,
# amount: 100.00
# }
#
# @TEST_EDGE: negative_amount ->
# {
# sender_balance: 500.00,
# receiver_balance: 100.00,
# amount: -10.00
# }
#
# @TEST_EDGE: self_transfer ->
# {
# sender_id: "acc_A",
# receiver_id: "acc_A",
# amount: 10.00
# }
# @TEST_EDGE: audit_failure -> raises Exception
# @TEST_EDGE: concurrency_conflict -> special: concurrent_execution
# @TEST_INVARIANT: total_balance_constant -> verifies: [sufficient_funds, concurrency_conflict]
# @TEST_INVARIANT: no_partial_commit -> verifies: [audit_failure]
# @TEST_INVARIANT: negative_transfer_forbidden -> verifies: [negative_amount]
# Linking Tests to Invariants
# @TEST_INVARIANT: total_balance_constant -> VERIFIED_BY: [sufficient_funds, concurrency_conflict]
# @TEST_INVARIANT: negative_transfer_forbidden -> VERIFIED_BY: [negative_amount]
from decimal import Decimal

View File

@@ -407,4 +407,104 @@ async def test_execute_migration_invalid_env_raises_400(_mock_env):
assert exc.value.status_code == 400
@pytest.mark.asyncio
async def test_dry_run_migration_returns_diff_and_risk(db_session):
# @TEST_EDGE: missing_target_datasource -> validates high risk item generation
# @TEST_EDGE: breaking_reference -> validates high risk on missing dataset link
from src.api.routes.migration import dry_run_migration
from src.models.dashboard import DashboardSelection
env_source = MagicMock()
env_source.id = "src"
env_source.name = "Source"
env_source.url = "http://source"
env_source.username = "admin"
env_source.password = "admin"
env_source.verify_ssl = False
env_source.timeout = 30
env_target = MagicMock()
env_target.id = "tgt"
env_target.name = "Target"
env_target.url = "http://target"
env_target.username = "admin"
env_target.password = "admin"
env_target.verify_ssl = False
env_target.timeout = 30
cm = _make_sync_config_manager([env_source, env_target])
selection = DashboardSelection(
selected_ids=[42],
source_env_id="src",
target_env_id="tgt",
replace_db_config=False,
fix_cross_filters=True,
)
with patch("src.api.routes.migration.SupersetClient") as MockClient, \
patch("src.api.routes.migration.MigrationDryRunService") as MockService:
source_client = MagicMock()
target_client = MagicMock()
MockClient.side_effect = [source_client, target_client]
service_instance = MagicMock()
service_payload = {
"generated_at": "2026-02-27T00:00:00+00:00",
"selection": selection.model_dump(),
"selected_dashboard_titles": ["Sales"],
"diff": {
"dashboards": {"create": [], "update": [{"uuid": "dash-1"}], "delete": []},
"charts": {"create": [{"uuid": "chart-1"}], "update": [], "delete": []},
"datasets": {"create": [{"uuid": "dataset-1"}], "update": [], "delete": []},
},
"summary": {
"dashboards": {"create": 0, "update": 1, "delete": 0},
"charts": {"create": 1, "update": 0, "delete": 0},
"datasets": {"create": 1, "update": 0, "delete": 0},
"selected_dashboards": 1,
},
"risk": {
"score": 75,
"level": "high",
"items": [
{"code": "missing_datasource"},
{"code": "breaking_reference"},
],
},
}
service_instance.run.return_value = service_payload
MockService.return_value = service_instance
result = await dry_run_migration(selection=selection, config_manager=cm, db=db_session, _=None)
assert result["summary"]["dashboards"]["update"] == 1
assert result["summary"]["charts"]["create"] == 1
assert result["summary"]["datasets"]["create"] == 1
assert result["risk"]["score"] > 0
assert any(item["code"] == "missing_datasource" for item in result["risk"]["items"])
assert any(item["code"] == "breaking_reference" for item in result["risk"]["items"])
@pytest.mark.asyncio
async def test_dry_run_migration_rejects_same_environment(db_session):
from src.api.routes.migration import dry_run_migration
from src.models.dashboard import DashboardSelection
env = MagicMock()
env.id = "same"
env.name = "Same"
env.url = "http://same"
env.username = "admin"
env.password = "admin"
env.verify_ssl = False
env.timeout = 30
cm = _make_sync_config_manager([env])
selection = DashboardSelection(selected_ids=[1], source_env_id="same", target_env_id="same")
with pytest.raises(HTTPException) as exc:
await dry_run_migration(selection=selection, config_manager=cm, db=db_session, _=None)
assert exc.value.status_code == 400
# [/DEF:backend.src.api.routes.__tests__.test_migration_routes:Module]

View File

@@ -14,6 +14,7 @@ from ...core.database import get_db
from ...models.dashboard import DashboardMetadata, DashboardSelection
from ...core.superset_client import SupersetClient
from ...core.logger import belief_scope
from ...core.migration.dry_run_orchestrator import MigrationDryRunService
from ...core.mapping_service import IdMappingService
from ...models.mapping import ResourceMapping
@@ -83,6 +84,44 @@ async def execute_migration(
raise HTTPException(status_code=500, detail=f"Failed to create migration task: {str(e)}")
# [/DEF:execute_migration:Function]
# [DEF:dry_run_migration:Function]
# @PURPOSE: Build pre-flight diff and risk summary without applying migration.
# @PRE: Selection and environments are valid.
# @POST: Returns deterministic JSON diff and risk scoring.
@router.post("/migration/dry-run", response_model=Dict[str, Any])
async def dry_run_migration(
selection: DashboardSelection,
config_manager=Depends(get_config_manager),
db: Session = Depends(get_db),
_ = Depends(has_permission("plugin:migration", "EXECUTE"))
):
with belief_scope("dry_run_migration"):
environments = config_manager.get_environments()
env_map = {env.id: env for env in environments}
source_env = env_map.get(selection.source_env_id)
target_env = env_map.get(selection.target_env_id)
if not source_env or not target_env:
raise HTTPException(status_code=400, detail="Invalid source or target environment")
if selection.source_env_id == selection.target_env_id:
raise HTTPException(status_code=400, detail="Source and target environments must be different")
if not selection.selected_ids:
raise HTTPException(status_code=400, detail="No dashboards selected for dry run")
service = MigrationDryRunService()
source_client = SupersetClient(source_env)
target_client = SupersetClient(target_env)
try:
return service.run(
selection=selection,
source_client=source_client,
target_client=target_client,
db=db,
)
except ValueError as exc:
raise HTTPException(status_code=500, detail=str(exc)) from exc
# [/DEF:dry_run_migration:Function]
# [DEF:get_migration_settings:Function]
# @PURPOSE: Get current migration Cron string explicitly.
@router.get("/migration/settings", response_model=Dict[str, str])

View File

@@ -0,0 +1,12 @@
# [DEF:backend.src.core.migration.__init__:Module]
# @TIER: TRIVIAL
# @SEMANTICS: migration, package, exports
# @PURPOSE: Namespace package for migration pre-flight orchestration components.
# @LAYER: Core
from .dry_run_orchestrator import MigrationDryRunService
from .archive_parser import MigrationArchiveParser
__all__ = ["MigrationDryRunService", "MigrationArchiveParser"]
# [/DEF:backend.src.core.migration.__init__:Module]

View File

@@ -0,0 +1,139 @@
# [DEF:backend.src.core.migration.archive_parser:Module]
# @TIER: STANDARD
# @SEMANTICS: migration, zip, parser, yaml, metadata
# @PURPOSE: Parse Superset export ZIP archives into normalized object catalogs for diffing.
# @LAYER: Core
# @RELATION: DEPENDS_ON -> backend.src.core.logger
# @INVARIANT: Parsing is read-only and never mutates archive files.
import json
import tempfile
import zipfile
from pathlib import Path
from typing import Any, Dict, List, Optional
import yaml
from ..logger import logger, belief_scope
# [DEF:MigrationArchiveParser:Class]
# @PURPOSE: Extract normalized dashboards/charts/datasets metadata from ZIP archives.
class MigrationArchiveParser:
# [DEF:extract_objects_from_zip:Function]
# @PURPOSE: Extract object catalogs from Superset archive.
# @PRE: zip_path points to a valid readable ZIP.
# @POST: Returns object lists grouped by resource type.
# @RETURN: Dict[str, List[Dict[str, Any]]]
def extract_objects_from_zip(self, zip_path: str) -> Dict[str, List[Dict[str, Any]]]:
with belief_scope("MigrationArchiveParser.extract_objects_from_zip"):
result: Dict[str, List[Dict[str, Any]]] = {
"dashboards": [],
"charts": [],
"datasets": [],
}
with tempfile.TemporaryDirectory() as temp_dir_str:
temp_dir = Path(temp_dir_str)
with zipfile.ZipFile(zip_path, "r") as zip_file:
zip_file.extractall(temp_dir)
result["dashboards"] = self._collect_yaml_objects(temp_dir, "dashboards")
result["charts"] = self._collect_yaml_objects(temp_dir, "charts")
result["datasets"] = self._collect_yaml_objects(temp_dir, "datasets")
return result
# [/DEF:extract_objects_from_zip:Function]
# [DEF:_collect_yaml_objects:Function]
# @PURPOSE: Read and normalize YAML manifests for one object type.
# @PRE: object_type is one of dashboards/charts/datasets.
# @POST: Returns only valid normalized objects.
def _collect_yaml_objects(self, root_dir: Path, object_type: str) -> List[Dict[str, Any]]:
with belief_scope("MigrationArchiveParser._collect_yaml_objects"):
files = list(root_dir.glob(f"**/{object_type}/**/*.yaml")) + list(root_dir.glob(f"**/{object_type}/*.yaml"))
objects: List[Dict[str, Any]] = []
for file_path in set(files):
try:
with open(file_path, "r") as file_obj:
payload = yaml.safe_load(file_obj) or {}
normalized = self._normalize_object_payload(payload, object_type)
if normalized:
objects.append(normalized)
except Exception as exc:
logger.reflect(
"[MigrationArchiveParser._collect_yaml_objects][REFLECT] skip_invalid_yaml path=%s error=%s",
file_path,
exc,
)
return objects
# [/DEF:_collect_yaml_objects:Function]
# [DEF:_normalize_object_payload:Function]
# @PURPOSE: Convert raw YAML payload to stable diff signature shape.
# @PRE: payload is parsed YAML mapping.
# @POST: Returns normalized descriptor with `uuid`, `title`, and `signature`.
def _normalize_object_payload(self, payload: Dict[str, Any], object_type: str) -> Optional[Dict[str, Any]]:
with belief_scope("MigrationArchiveParser._normalize_object_payload"):
if not isinstance(payload, dict):
return None
uuid = payload.get("uuid")
if not uuid:
return None
if object_type == "dashboards":
title = payload.get("dashboard_title") or payload.get("title")
signature = {
"title": title,
"slug": payload.get("slug"),
"position_json": payload.get("position_json"),
"json_metadata": payload.get("json_metadata"),
"description": payload.get("description"),
"owners": payload.get("owners"),
}
return {
"uuid": str(uuid),
"title": title or f"Dashboard {uuid}",
"signature": json.dumps(signature, sort_keys=True, default=str),
"owners": payload.get("owners") or [],
}
if object_type == "charts":
title = payload.get("slice_name") or payload.get("name")
signature = {
"title": title,
"viz_type": payload.get("viz_type"),
"params": payload.get("params"),
"query_context": payload.get("query_context"),
"datasource_uuid": payload.get("datasource_uuid"),
"dataset_uuid": payload.get("dataset_uuid"),
}
return {
"uuid": str(uuid),
"title": title or f"Chart {uuid}",
"signature": json.dumps(signature, sort_keys=True, default=str),
"dataset_uuid": payload.get("datasource_uuid") or payload.get("dataset_uuid"),
}
if object_type == "datasets":
title = payload.get("table_name") or payload.get("name")
signature = {
"title": title,
"schema": payload.get("schema"),
"database_uuid": payload.get("database_uuid"),
"sql": payload.get("sql"),
"columns": payload.get("columns"),
"metrics": payload.get("metrics"),
}
return {
"uuid": str(uuid),
"title": title or f"Dataset {uuid}",
"signature": json.dumps(signature, sort_keys=True, default=str),
"database_uuid": payload.get("database_uuid"),
}
return None
# [/DEF:_normalize_object_payload:Function]
# [/DEF:MigrationArchiveParser:Class]
# [/DEF:backend.src.core.migration.archive_parser:Module]

View File

@@ -0,0 +1,235 @@
# [DEF:backend.src.core.migration.dry_run_orchestrator:Module]
# @TIER: STANDARD
# @SEMANTICS: migration, dry_run, diff, risk, superset
# @PURPOSE: Compute pre-flight migration diff and risk scoring without apply.
# @LAYER: Core
# @RELATION: DEPENDS_ON -> backend.src.core.superset_client
# @RELATION: DEPENDS_ON -> backend.src.core.migration_engine
# @RELATION: DEPENDS_ON -> backend.src.core.migration.archive_parser
# @RELATION: DEPENDS_ON -> backend.src.core.migration.risk_assessor
# @INVARIANT: Dry run is informative only and must not mutate target environment.
from datetime import datetime, timezone
import json
from typing import Any, Dict, List
from sqlalchemy.orm import Session
from ...models.dashboard import DashboardSelection
from ...models.mapping import DatabaseMapping
from ..logger import logger, belief_scope
from .archive_parser import MigrationArchiveParser
from .risk_assessor import build_risks, score_risks
from ..migration_engine import MigrationEngine
from ..superset_client import SupersetClient
from ..utils.fileio import create_temp_file
# [DEF:MigrationDryRunService:Class]
# @PURPOSE: Build deterministic diff/risk payload for migration pre-flight.
class MigrationDryRunService:
# [DEF:__init__:Function]
# @PURPOSE: Wire parser dependency for archive object extraction.
# @PRE: parser can be omitted to use default implementation.
# @POST: Service is ready to calculate dry-run payload.
def __init__(self, parser: MigrationArchiveParser | None = None):
self.parser = parser or MigrationArchiveParser()
# [/DEF:__init__:Function]
# [DEF:run:Function]
# @PURPOSE: Execute full dry-run computation for selected dashboards.
# @PRE: source/target clients are authenticated and selection validated by caller.
# @POST: Returns JSON-serializable pre-flight payload with summary, diff and risk.
# @SIDE_EFFECT: Reads source export archives and target metadata via network.
def run(
self,
selection: DashboardSelection,
source_client: SupersetClient,
target_client: SupersetClient,
db: Session,
) -> Dict[str, Any]:
with belief_scope("MigrationDryRunService.run"):
logger.explore("[MigrationDryRunService.run][EXPLORE] starting dry-run pipeline")
engine = MigrationEngine()
db_mapping = self._load_db_mapping(db, selection) if selection.replace_db_config else {}
transformed = {"dashboards": {}, "charts": {}, "datasets": {}}
dashboards_preview = source_client.get_dashboards_summary()
selected_preview = {
item["id"]: item
for item in dashboards_preview
if item.get("id") in selection.selected_ids
}
for dashboard_id in selection.selected_ids:
exported_content, _ = source_client.export_dashboard(int(dashboard_id))
with create_temp_file(content=exported_content, suffix=".zip") as source_zip:
with create_temp_file(suffix=".zip") as transformed_zip:
success = engine.transform_zip(
str(source_zip),
str(transformed_zip),
db_mapping,
strip_databases=False,
target_env_id=selection.target_env_id,
fix_cross_filters=selection.fix_cross_filters,
)
if not success:
raise ValueError(f"Failed to transform export archive for dashboard {dashboard_id}")
extracted = self.parser.extract_objects_from_zip(str(transformed_zip))
self._accumulate_objects(transformed, extracted)
source_objects = {key: list(value.values()) for key, value in transformed.items()}
target_objects = self._build_target_signatures(target_client)
diff = {
"dashboards": self._build_object_diff(source_objects["dashboards"], target_objects["dashboards"]),
"charts": self._build_object_diff(source_objects["charts"], target_objects["charts"]),
"datasets": self._build_object_diff(source_objects["datasets"], target_objects["datasets"]),
}
risk = self._build_risks(source_objects, target_objects, diff, target_client)
summary = {
"dashboards": {action: len(diff["dashboards"][action]) for action in ("create", "update", "delete")},
"charts": {action: len(diff["charts"][action]) for action in ("create", "update", "delete")},
"datasets": {action: len(diff["datasets"][action]) for action in ("create", "update", "delete")},
"selected_dashboards": len(selection.selected_ids),
}
selected_titles = [
selected_preview[dash_id]["title"]
for dash_id in selection.selected_ids
if dash_id in selected_preview
]
logger.reason("[MigrationDryRunService.run][REASON] dry-run payload assembled")
return {
"generated_at": datetime.now(timezone.utc).isoformat(),
"selection": selection.model_dump(),
"selected_dashboard_titles": selected_titles,
"diff": diff,
"summary": summary,
"risk": score_risks(risk),
}
# [/DEF:run:Function]
# [DEF:_load_db_mapping:Function]
# @PURPOSE: Resolve UUID mapping for optional DB config replacement.
def _load_db_mapping(self, db: Session, selection: DashboardSelection) -> Dict[str, str]:
rows = db.query(DatabaseMapping).filter(
DatabaseMapping.source_env_id == selection.source_env_id,
DatabaseMapping.target_env_id == selection.target_env_id,
).all()
return {row.source_db_uuid: row.target_db_uuid for row in rows}
# [/DEF:_load_db_mapping:Function]
# [DEF:_accumulate_objects:Function]
# @PURPOSE: Merge extracted resources by UUID to avoid duplicates.
def _accumulate_objects(self, target: Dict[str, Dict[str, Dict[str, Any]]], source: Dict[str, List[Dict[str, Any]]]) -> None:
for object_type in ("dashboards", "charts", "datasets"):
for item in source.get(object_type, []):
uuid = item.get("uuid")
if uuid:
target[object_type][str(uuid)] = item
# [/DEF:_accumulate_objects:Function]
# [DEF:_index_by_uuid:Function]
# @PURPOSE: Build UUID-index map for normalized resources.
def _index_by_uuid(self, objects: List[Dict[str, Any]]) -> Dict[str, Dict[str, Any]]:
indexed: Dict[str, Dict[str, Any]] = {}
for obj in objects:
uuid = obj.get("uuid")
if uuid:
indexed[str(uuid)] = obj
return indexed
# [/DEF:_index_by_uuid:Function]
# [DEF:_build_object_diff:Function]
# @PURPOSE: Compute create/update/delete buckets by UUID+signature.
def _build_object_diff(self, source_objects: List[Dict[str, Any]], target_objects: List[Dict[str, Any]]) -> Dict[str, List[Dict[str, Any]]]:
target_index = self._index_by_uuid(target_objects)
created: List[Dict[str, Any]] = []
updated: List[Dict[str, Any]] = []
deleted: List[Dict[str, Any]] = []
for source_obj in source_objects:
source_uuid = str(source_obj.get("uuid"))
target_obj = target_index.get(source_uuid)
if not target_obj:
created.append({"uuid": source_uuid, "title": source_obj.get("title")})
continue
if source_obj.get("signature") != target_obj.get("signature"):
updated.append({
"uuid": source_uuid,
"title": source_obj.get("title"),
"target_title": target_obj.get("title"),
})
return {"create": created, "update": updated, "delete": deleted}
# [/DEF:_build_object_diff:Function]
# [DEF:_build_target_signatures:Function]
# @PURPOSE: Pull target metadata and normalize it into comparable signatures.
def _build_target_signatures(self, client: SupersetClient) -> Dict[str, List[Dict[str, Any]]]:
_, dashboards = client.get_dashboards(query={
"columns": ["uuid", "dashboard_title", "slug", "position_json", "json_metadata", "description", "owners"],
})
_, datasets = client.get_datasets(query={
"columns": ["uuid", "table_name", "schema", "database_uuid", "sql", "columns", "metrics"],
})
_, charts = client.get_charts(query={
"columns": ["uuid", "slice_name", "viz_type", "params", "query_context", "datasource_uuid", "dataset_uuid"],
})
return {
"dashboards": [{
"uuid": str(item.get("uuid")),
"title": item.get("dashboard_title"),
"owners": item.get("owners") or [],
"signature": json.dumps({
"title": item.get("dashboard_title"),
"slug": item.get("slug"),
"position_json": item.get("position_json"),
"json_metadata": item.get("json_metadata"),
"description": item.get("description"),
"owners": item.get("owners"),
}, sort_keys=True, default=str),
} for item in dashboards if item.get("uuid")],
"datasets": [{
"uuid": str(item.get("uuid")),
"title": item.get("table_name"),
"database_uuid": item.get("database_uuid"),
"signature": json.dumps({
"title": item.get("table_name"),
"schema": item.get("schema"),
"database_uuid": item.get("database_uuid"),
"sql": item.get("sql"),
"columns": item.get("columns"),
"metrics": item.get("metrics"),
}, sort_keys=True, default=str),
} for item in datasets if item.get("uuid")],
"charts": [{
"uuid": str(item.get("uuid")),
"title": item.get("slice_name") or item.get("name"),
"dataset_uuid": item.get("datasource_uuid") or item.get("dataset_uuid"),
"signature": json.dumps({
"title": item.get("slice_name") or item.get("name"),
"viz_type": item.get("viz_type"),
"params": item.get("params"),
"query_context": item.get("query_context"),
"datasource_uuid": item.get("datasource_uuid"),
"dataset_uuid": item.get("dataset_uuid"),
}, sort_keys=True, default=str),
} for item in charts if item.get("uuid")],
}
# [/DEF:_build_target_signatures:Function]
# [DEF:_build_risks:Function]
# @PURPOSE: Build risk items for missing datasource, broken refs, overwrite, owner mismatch.
def _build_risks(
self,
source_objects: Dict[str, List[Dict[str, Any]]],
target_objects: Dict[str, List[Dict[str, Any]]],
diff: Dict[str, Dict[str, List[Dict[str, Any]]]],
target_client: SupersetClient,
) -> List[Dict[str, Any]]:
return build_risks(source_objects, target_objects, diff, target_client)
# [/DEF:_build_risks:Function]
# [/DEF:MigrationDryRunService:Class]
# [/DEF:backend.src.core.migration.dry_run_orchestrator:Module]

View File

@@ -0,0 +1,119 @@
# [DEF:backend.src.core.migration.risk_assessor:Module]
# @TIER: STANDARD
# @SEMANTICS: migration, dry_run, risk, scoring
# @PURPOSE: Risk evaluation helpers for migration pre-flight reporting.
# @LAYER: Core
# @RELATION: USED_BY -> backend.src.core.migration.dry_run_orchestrator
from typing import Any, Dict, List
from ..superset_client import SupersetClient
# [DEF:index_by_uuid:Function]
# @PURPOSE: Build UUID-index from normalized objects.
def index_by_uuid(objects: List[Dict[str, Any]]) -> Dict[str, Dict[str, Any]]:
indexed: Dict[str, Dict[str, Any]] = {}
for obj in objects:
uuid = obj.get("uuid")
if uuid:
indexed[str(uuid)] = obj
return indexed
# [/DEF:index_by_uuid:Function]
# [DEF:extract_owner_identifiers:Function]
# @PURPOSE: Normalize owner payloads for stable comparison.
def extract_owner_identifiers(owners: Any) -> List[str]:
if not isinstance(owners, list):
return []
ids: List[str] = []
for owner in owners:
if isinstance(owner, dict):
if owner.get("username"):
ids.append(str(owner["username"]))
elif owner.get("id") is not None:
ids.append(str(owner["id"]))
elif owner is not None:
ids.append(str(owner))
return sorted(set(ids))
# [/DEF:extract_owner_identifiers:Function]
# [DEF:build_risks:Function]
# @PURPOSE: Build risk list from computed diffs and target catalog state.
def build_risks(
source_objects: Dict[str, List[Dict[str, Any]]],
target_objects: Dict[str, List[Dict[str, Any]]],
diff: Dict[str, Dict[str, List[Dict[str, Any]]]],
target_client: SupersetClient,
) -> List[Dict[str, Any]]:
risks: List[Dict[str, Any]] = []
for object_type in ("dashboards", "charts", "datasets"):
for item in diff[object_type]["update"]:
risks.append({
"code": "overwrite_existing",
"severity": "medium",
"object_type": object_type[:-1],
"object_uuid": item["uuid"],
"message": f"Object will be updated in target: {item.get('title') or item['uuid']}",
})
target_dataset_uuids = set(index_by_uuid(target_objects["datasets"]).keys())
_, target_databases = target_client.get_databases(query={"columns": ["uuid"]})
target_database_uuids = {str(item.get("uuid")) for item in target_databases if item.get("uuid")}
for dataset in source_objects["datasets"]:
db_uuid = dataset.get("database_uuid")
if db_uuid and str(db_uuid) not in target_database_uuids:
risks.append({
"code": "missing_datasource",
"severity": "high",
"object_type": "dataset",
"object_uuid": dataset.get("uuid"),
"message": f"Target datasource is missing for dataset {dataset.get('title') or dataset.get('uuid')}",
})
for chart in source_objects["charts"]:
ds_uuid = chart.get("dataset_uuid")
if ds_uuid and str(ds_uuid) not in target_dataset_uuids:
risks.append({
"code": "breaking_reference",
"severity": "high",
"object_type": "chart",
"object_uuid": chart.get("uuid"),
"message": f"Chart references dataset not found on target: {ds_uuid}",
})
source_dash = index_by_uuid(source_objects["dashboards"])
target_dash = index_by_uuid(target_objects["dashboards"])
for item in diff["dashboards"]["update"]:
source_obj = source_dash.get(item["uuid"])
target_obj = target_dash.get(item["uuid"])
if not source_obj or not target_obj:
continue
source_owners = extract_owner_identifiers(source_obj.get("owners"))
target_owners = extract_owner_identifiers(target_obj.get("owners"))
if source_owners and target_owners and source_owners != target_owners:
risks.append({
"code": "owner_mismatch",
"severity": "low",
"object_type": "dashboard",
"object_uuid": item["uuid"],
"message": f"Owner mismatch for dashboard {item.get('title') or item['uuid']}",
})
return risks
# [/DEF:build_risks:Function]
# [DEF:score_risks:Function]
# @PURPOSE: Aggregate risk list into score and level.
def score_risks(risk_items: List[Dict[str, Any]]) -> Dict[str, Any]:
weights = {"high": 25, "medium": 10, "low": 5}
score = min(100, sum(weights.get(item.get("severity", "low"), 5) for item in risk_items))
level = "low" if score < 25 else "medium" if score < 60 else "high"
return {"score": score, "level": level, "items": risk_items}
# [/DEF:score_risks:Function]
# [/DEF:backend.src.core.migration.risk_assessor:Module]

View File

@@ -336,6 +336,25 @@ class SupersetClient:
}
# [/DEF:get_dashboard_detail:Function]
# [DEF:get_charts:Function]
# @PURPOSE: Fetches all charts with pagination support.
# @PARAM: query (Optional[Dict]) - Optional query params/columns/filters.
# @PRE: Client is authenticated.
# @POST: Returns total count and charts list.
# @RETURN: Tuple[int, List[Dict]]
def get_charts(self, query: Optional[Dict] = None) -> Tuple[int, List[Dict]]:
with belief_scope("get_charts"):
validated_query = self._validate_query_params(query or {})
if "columns" not in validated_query:
validated_query["columns"] = ["id", "uuid", "slice_name", "viz_type"]
paginated_data = self._fetch_all_pages(
endpoint="/chart/",
pagination_options={"base_query": validated_query, "results_field": "result"},
)
return len(paginated_data), paginated_data
# [/DEF:get_charts:Function]
# [DEF:_extract_chart_ids_from_layout:Function]
# @PURPOSE: Traverses dashboard layout metadata and extracts chart IDs from common keys.
# @PRE: payload can be dict/list/scalar.

View File

@@ -0,0 +1,62 @@
# [DEF:backend.tests.core.migration.test_archive_parser:Module]
#
# @TIER: STANDARD
# @PURPOSE: Unit tests for MigrationArchiveParser ZIP extraction contract.
# @LAYER: Domain
# @RELATION: VERIFIES -> backend.src.core.migration.archive_parser
#
import os
import sys
import tempfile
import zipfile
from pathlib import Path
import yaml
backend_dir = str(Path(__file__).parent.parent.parent.parent.resolve())
if backend_dir not in sys.path:
sys.path.insert(0, backend_dir)
from src.core.migration.archive_parser import MigrationArchiveParser
def test_extract_objects_from_zip_collects_all_types():
parser = MigrationArchiveParser()
with tempfile.TemporaryDirectory() as td:
td_path = Path(td)
zip_path = td_path / "objects.zip"
src_dir = td_path / "src"
(src_dir / "dashboards").mkdir(parents=True)
(src_dir / "charts").mkdir(parents=True)
(src_dir / "datasets").mkdir(parents=True)
with open(src_dir / "dashboards" / "dash.yaml", "w") as file_obj:
yaml.dump({"uuid": "dash-u1", "dashboard_title": "D1", "json_metadata": "{}"}, file_obj)
with open(src_dir / "charts" / "chart.yaml", "w") as file_obj:
yaml.dump({"uuid": "chart-u1", "slice_name": "C1", "viz_type": "bar"}, file_obj)
with open(src_dir / "datasets" / "dataset.yaml", "w") as file_obj:
yaml.dump({"uuid": "ds-u1", "table_name": "orders", "database_uuid": "db-u1"}, file_obj)
with zipfile.ZipFile(zip_path, "w") as zip_obj:
for root, _, files in os.walk(src_dir):
for file_name in files:
file_path = Path(root) / file_name
zip_obj.write(file_path, file_path.relative_to(src_dir))
extracted = parser.extract_objects_from_zip(str(zip_path))
if len(extracted["dashboards"]) != 1:
raise AssertionError("dashboards extraction size mismatch")
if extracted["dashboards"][0]["uuid"] != "dash-u1":
raise AssertionError("dashboard uuid mismatch")
if len(extracted["charts"]) != 1:
raise AssertionError("charts extraction size mismatch")
if extracted["charts"][0]["uuid"] != "chart-u1":
raise AssertionError("chart uuid mismatch")
if len(extracted["datasets"]) != 1:
raise AssertionError("datasets extraction size mismatch")
if extracted["datasets"][0]["uuid"] != "ds-u1":
raise AssertionError("dataset uuid mismatch")
# [/DEF:backend.tests.core.migration.test_archive_parser:Module]

View File

@@ -0,0 +1,110 @@
# [DEF:backend.tests.core.migration.test_dry_run_orchestrator:Module]
#
# @TIER: STANDARD
# @PURPOSE: Unit tests for MigrationDryRunService diff and risk computation contracts.
# @LAYER: Domain
# @RELATION: VERIFIES -> backend.src.core.migration.dry_run_orchestrator
#
import json
import sys
from pathlib import Path
from unittest.mock import MagicMock, patch
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import StaticPool
backend_dir = str(Path(__file__).parent.parent.parent.parent.resolve())
if backend_dir not in sys.path:
sys.path.insert(0, backend_dir)
from src.core.migration.dry_run_orchestrator import MigrationDryRunService
from src.models.dashboard import DashboardSelection
from src.models.mapping import Base
def _load_fixture() -> dict:
fixture_path = Path(__file__).parents[2] / "fixtures" / "migration_dry_run_fixture.json"
return json.loads(fixture_path.read_text())
def _make_session():
engine = create_engine(
"sqlite:///:memory:",
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
return Session()
def test_migration_dry_run_service_builds_diff_and_risk():
# @TEST_CONTRACT: dry_run_result_contract -> {
# required_fields: {diff: object, summary: object, risk: object},
# invariants: ["risk.score >= 0", "summary.selected_dashboards == len(selection.selected_ids)"]
# }
# @TEST_FIXTURE: migration_dry_run_fixture -> backend/tests/fixtures/migration_dry_run_fixture.json
# @TEST_EDGE: missing_target_datasource -> fixture.transformed_zip_objects.datasets[0].database_uuid
# @TEST_EDGE: breaking_reference -> fixture.transformed_zip_objects.charts[0].dataset_uuid
fixture = _load_fixture()
db = _make_session()
selection = DashboardSelection(
selected_ids=[42],
source_env_id="src",
target_env_id="tgt",
replace_db_config=False,
fix_cross_filters=True,
)
source_client = MagicMock()
source_client.get_dashboards_summary.return_value = fixture["source_dashboard_summary"]
source_client.export_dashboard.return_value = (b"PK\x03\x04", "source.zip")
target_client = MagicMock()
target_client.get_dashboards.return_value = (
len(fixture["target"]["dashboards"]),
fixture["target"]["dashboards"],
)
target_client.get_datasets.return_value = (
len(fixture["target"]["datasets"]),
fixture["target"]["datasets"],
)
target_client.get_charts.return_value = (
len(fixture["target"]["charts"]),
fixture["target"]["charts"],
)
target_client.get_databases.return_value = (
len(fixture["target"]["databases"]),
fixture["target"]["databases"],
)
parser = MagicMock()
parser.extract_objects_from_zip.return_value = fixture["transformed_zip_objects"]
service = MigrationDryRunService(parser=parser)
with patch("src.core.migration.dry_run_orchestrator.MigrationEngine") as EngineMock:
engine = MagicMock()
engine.transform_zip.return_value = True
EngineMock.return_value = engine
result = service.run(selection, source_client, target_client, db)
if "summary" not in result:
raise AssertionError("summary is missing in dry-run payload")
if result["summary"]["selected_dashboards"] != 1:
raise AssertionError("selected_dashboards summary mismatch")
if result["summary"]["dashboards"]["update"] != 1:
raise AssertionError("dashboard update count mismatch")
if result["summary"]["charts"]["create"] != 1:
raise AssertionError("chart create count mismatch")
if result["summary"]["datasets"]["create"] != 1:
raise AssertionError("dataset create count mismatch")
risk_codes = {item["code"] for item in result["risk"]["items"]}
if "missing_datasource" not in risk_codes:
raise AssertionError("missing_datasource risk is not detected")
if "breaking_reference" not in risk_codes:
raise AssertionError("breaking_reference risk is not detected")
# [/DEF:backend.tests.core.migration.test_dry_run_orchestrator:Module]

View File

@@ -0,0 +1,58 @@
{
"source_dashboard_summary": [
{
"id": 42,
"title": "Sales"
}
],
"target": {
"dashboards": [
{
"uuid": "dash-1",
"dashboard_title": "Sales Old",
"slug": "sales-old",
"position_json": "{}",
"json_metadata": "{}",
"description": "",
"owners": [
{
"username": "owner-a"
}
]
}
],
"datasets": [],
"charts": [],
"databases": []
},
"transformed_zip_objects": {
"dashboards": [
{
"uuid": "dash-1",
"title": "Sales New",
"signature": "{\"title\":\"Sales New\"}",
"owners": [
{
"username": "owner-b"
}
]
}
],
"charts": [
{
"uuid": "chart-1",
"title": "Chart A",
"signature": "{\"title\":\"Chart A\"}",
"dataset_uuid": "dataset-404"
}
],
"datasets": [
{
"uuid": "dataset-1",
"title": "orders",
"signature": "{\"title\":\"orders\"}",
"database_uuid": "db-missing"
}
]
}
}

View File

@@ -24,6 +24,7 @@
import type {
DashboardMetadata,
DashboardSelection,
MigrationDryRunResult,
} from "../../types/dashboard";
import { t } from "$lib/i18n";
import { Button, Card, PageHeader } from "$lib/ui";
@@ -44,6 +45,8 @@
let mappings: any[] = [];
let suggestions: any[] = [];
let fetchingDbs = false;
let dryRunLoading = false;
let dryRunResult: MigrationDryRunResult | null = null;
// UI State for Modals
let showLogViewer = false;
@@ -253,6 +256,7 @@
error = "";
try {
dryRunResult = null;
const selection: DashboardSelection = {
selected_ids: selectedDashboardIds,
source_env_id: sourceEnvId,
@@ -296,6 +300,57 @@
}
}
// [/DEF:startMigration:Function]
// [DEF:startDryRun:Function]
/**
* @purpose Builds pre-flight diff and risk summary without applying migration.
* @pre source/target environments and selected dashboards are valid.
* @post dryRunResult is populated with backend response.
* @UX_STATE: Idle -> Dry Run button is enabled when selection is valid.
* @UX_STATE: Loading -> Dry Run button shows "Dry Run..." and stays disabled.
* @UX_STATE: Error -> error banner is displayed and dryRunResult resets to null.
* @UX_FEEDBACK: User sees summary cards + risk block + JSON details after success.
* @UX_RECOVERY: User can adjust selection and press Dry Run again.
*/
async function startDryRun() {
if (!sourceEnvId || !targetEnvId) {
error =
$t.migration?.select_both_envs ||
"Please select both source and target environments.";
return;
}
if (sourceEnvId === targetEnvId) {
error =
$t.migration?.different_envs ||
"Source and target environments must be different.";
return;
}
if (selectedDashboardIds.length === 0) {
error =
$t.migration?.select_dashboards ||
"Please select at least one dashboard to migrate.";
return;
}
error = "";
dryRunLoading = true;
try {
const selection: DashboardSelection = {
selected_ids: selectedDashboardIds,
source_env_id: sourceEnvId,
target_env_id: targetEnvId,
replace_db_config: replaceDb,
fix_cross_filters: fixCrossFilters,
};
dryRunResult = await api.postApi("/migration/dry-run", selection);
} catch (e) {
error = e.message;
dryRunResult = null;
} finally {
dryRunLoading = false;
}
}
// [/DEF:startDryRun:Function]
</script>
<!-- [SECTION: TEMPLATE] -->
@@ -417,6 +472,19 @@
</div>
{/if}
<div class="flex items-center gap-3">
<Button
variant="secondary"
on:click={startDryRun}
disabled={!sourceEnvId ||
!targetEnvId ||
sourceEnvId === targetEnvId ||
selectedDashboardIds.length === 0 ||
dryRunLoading}
>
{dryRunLoading ? "Dry Run..." : "Dry Run"}
</Button>
<Button
on:click={startMigration}
disabled={!sourceEnvId ||
@@ -424,8 +492,50 @@
sourceEnvId === targetEnvId ||
selectedDashboardIds.length === 0}
>
{$t.migration?.start }
{$t.migration?.start || "Apply"}
</Button>
</div>
{#if dryRunResult}
<div class="mt-6 rounded-md border border-slate-200 bg-slate-50 p-4 space-y-3">
<h3 class="text-base font-semibold">Pre-flight Diff</h3>
<div class="grid grid-cols-1 md:grid-cols-3 gap-3 text-sm">
<div class="rounded border border-slate-200 bg-white p-3">
<div class="font-medium mb-1">Dashboards</div>
<div>create: {dryRunResult.summary.dashboards.create}</div>
<div>update: {dryRunResult.summary.dashboards.update}</div>
<div>delete: {dryRunResult.summary.dashboards.delete}</div>
</div>
<div class="rounded border border-slate-200 bg-white p-3">
<div class="font-medium mb-1">Charts</div>
<div>create: {dryRunResult.summary.charts.create}</div>
<div>update: {dryRunResult.summary.charts.update}</div>
<div>delete: {dryRunResult.summary.charts.delete}</div>
</div>
<div class="rounded border border-slate-200 bg-white p-3">
<div class="font-medium mb-1">Datasets</div>
<div>create: {dryRunResult.summary.datasets.create}</div>
<div>update: {dryRunResult.summary.datasets.update}</div>
<div>delete: {dryRunResult.summary.datasets.delete}</div>
</div>
</div>
<div class="rounded border border-slate-200 bg-white p-3 text-sm">
<div class="font-medium mb-1">Risk</div>
<div>
score: {dryRunResult.risk.score}, level: {dryRunResult.risk.level}
</div>
<div class="mt-1">
issues: {dryRunResult.risk.items.length}
</div>
</div>
<details class="rounded border border-slate-200 bg-white p-3">
<summary class="cursor-pointer font-medium">Diff JSON</summary>
<pre class="mt-2 max-h-72 overflow-auto text-xs">{JSON.stringify(dryRunResult, null, 2)}</pre>
</details>
</div>
{/if}
{/if}
</div>

View File

@@ -16,5 +16,48 @@ export interface DashboardSelection {
source_env_id: string;
target_env_id: string;
replace_db_config?: boolean;
fix_cross_filters?: boolean;
}
export interface DiffObjectRef {
uuid: string;
title?: string;
target_title?: string;
}
export interface DiffBucket {
create: DiffObjectRef[];
update: DiffObjectRef[];
delete: DiffObjectRef[];
}
export interface DryRunRiskItem {
code: string;
severity: "low" | "medium" | "high";
object_type: string;
object_uuid: string;
message: string;
}
export interface MigrationDryRunResult {
generated_at: string;
selection: DashboardSelection;
selected_dashboard_titles: string[];
diff: {
dashboards: DiffBucket;
charts: DiffBucket;
datasets: DiffBucket;
};
summary: {
dashboards: Record<"create" | "update" | "delete", number>;
charts: Record<"create" | "update" | "delete", number>;
datasets: Record<"create" | "update" | "delete", number>;
selected_dashboards: number;
};
risk: {
score: number;
level: "low" | "medium" | "high";
items: DryRunRiskItem[];
};
}
// [/DEF:DashboardTypes:Module]

File diff suppressed because it is too large Load Diff