291 lines
9.9 KiB
Python
291 lines
9.9 KiB
Python
# [DEF:backend.tests.core.test_migration_engine:Module]
|
|
#
|
|
# @TIER: STANDARD
|
|
# @PURPOSE: Unit tests for MigrationEngine's cross-filter patching algorithms.
|
|
# @LAYER: Domain
|
|
# @RELATION: VERIFIES -> backend.src.core.migration_engine
|
|
#
|
|
import pytest
|
|
import tempfile
|
|
import json
|
|
import yaml
|
|
import zipfile
|
|
import sys
|
|
import os
|
|
from pathlib import Path
|
|
from unittest.mock import MagicMock
|
|
|
|
backend_dir = str(Path(__file__).parent.parent.parent.resolve())
|
|
if backend_dir not in sys.path:
|
|
sys.path.insert(0, backend_dir)
|
|
|
|
from src.core.migration_engine import MigrationEngine
|
|
from src.core.mapping_service import IdMappingService
|
|
from src.models.mapping import ResourceType
|
|
|
|
|
|
# --- Fixtures ---
|
|
|
|
class MockMappingService:
|
|
"""Mock that simulates IdMappingService.get_remote_ids_batch."""
|
|
def __init__(self, mappings: dict):
|
|
self.mappings = mappings
|
|
|
|
def get_remote_ids_batch(self, env_id, resource_type, uuids):
|
|
result = {}
|
|
for uuid in uuids:
|
|
if uuid in self.mappings:
|
|
result[uuid] = self.mappings[uuid]
|
|
return result
|
|
|
|
|
|
def _write_dashboard_yaml(dir_path: Path, metadata: dict) -> Path:
|
|
"""Helper: writes a dashboard YAML file with json_metadata."""
|
|
file_path = dir_path / "dash.yaml"
|
|
with open(file_path, 'w') as f:
|
|
yaml.dump({"json_metadata": json.dumps(metadata)}, f)
|
|
return file_path
|
|
|
|
|
|
# --- _patch_dashboard_metadata tests ---
|
|
|
|
def test_patch_dashboard_metadata_replaces_chart_ids():
|
|
"""Verifies that chartId values are replaced using the mapping service."""
|
|
mock_service = MockMappingService({"uuid-chart-A": 999})
|
|
engine = MigrationEngine(mock_service)
|
|
|
|
metadata = {
|
|
"native_filter_configuration": [
|
|
{"targets": [{"chartId": 42}]}
|
|
]
|
|
}
|
|
|
|
with tempfile.TemporaryDirectory() as td:
|
|
fp = _write_dashboard_yaml(Path(td), metadata)
|
|
source_map = {42: "uuid-chart-A"}
|
|
|
|
engine._patch_dashboard_metadata(fp, "target-env", source_map)
|
|
|
|
with open(fp, 'r') as f:
|
|
data = yaml.safe_load(f)
|
|
result = json.loads(data["json_metadata"])
|
|
assert result["native_filter_configuration"][0]["targets"][0]["chartId"] == 999
|
|
|
|
|
|
def test_patch_dashboard_metadata_replaces_dataset_ids():
|
|
"""Verifies that datasetId values are replaced using the mapping service."""
|
|
mock_service = MockMappingService({"uuid-ds-B": 500})
|
|
engine = MigrationEngine(mock_service)
|
|
|
|
metadata = {
|
|
"native_filter_configuration": [
|
|
{"targets": [{"datasetId": 10}]}
|
|
]
|
|
}
|
|
|
|
with tempfile.TemporaryDirectory() as td:
|
|
fp = _write_dashboard_yaml(Path(td), metadata)
|
|
source_map = {10: "uuid-ds-B"}
|
|
|
|
engine._patch_dashboard_metadata(fp, "target-env", source_map)
|
|
|
|
with open(fp, 'r') as f:
|
|
data = yaml.safe_load(f)
|
|
result = json.loads(data["json_metadata"])
|
|
assert result["native_filter_configuration"][0]["targets"][0]["datasetId"] == 500
|
|
|
|
|
|
def test_patch_dashboard_metadata_skips_when_no_metadata():
|
|
"""Verifies early return when json_metadata key is absent."""
|
|
mock_service = MockMappingService({})
|
|
engine = MigrationEngine(mock_service)
|
|
|
|
with tempfile.TemporaryDirectory() as td:
|
|
fp = Path(td) / "dash.yaml"
|
|
with open(fp, 'w') as f:
|
|
yaml.dump({"title": "No metadata here"}, f)
|
|
|
|
engine._patch_dashboard_metadata(fp, "target-env", {})
|
|
|
|
with open(fp, 'r') as f:
|
|
data = yaml.safe_load(f)
|
|
assert "json_metadata" not in data
|
|
|
|
|
|
def test_patch_dashboard_metadata_handles_missing_targets():
|
|
"""When some source IDs have no target mapping, patches what it can and leaves the rest."""
|
|
mock_service = MockMappingService({"uuid-A": 100}) # Only uuid-A maps
|
|
engine = MigrationEngine(mock_service)
|
|
|
|
metadata = {
|
|
"native_filter_configuration": [
|
|
{"targets": [{"datasetId": 1}, {"datasetId": 2}]}
|
|
]
|
|
}
|
|
|
|
with tempfile.TemporaryDirectory() as td:
|
|
fp = _write_dashboard_yaml(Path(td), metadata)
|
|
source_map = {1: "uuid-A", 2: "uuid-MISSING"} # uuid-MISSING won't resolve
|
|
|
|
engine._patch_dashboard_metadata(fp, "target-env", source_map)
|
|
|
|
with open(fp, 'r') as f:
|
|
data = yaml.safe_load(f)
|
|
result = json.loads(data["json_metadata"])
|
|
targets = result["native_filter_configuration"][0]["targets"]
|
|
# ID 1 should be replaced to 100; ID 2 should remain 2
|
|
assert targets[0]["datasetId"] == 100
|
|
assert targets[1]["datasetId"] == 2
|
|
|
|
|
|
# --- _extract_chart_uuids_from_archive tests ---
|
|
|
|
def test_extract_chart_uuids_from_archive():
|
|
"""Verifies that chart YAML files are parsed for id->uuid mappings."""
|
|
engine = MigrationEngine()
|
|
|
|
with tempfile.TemporaryDirectory() as td:
|
|
charts_dir = Path(td) / "charts"
|
|
charts_dir.mkdir()
|
|
|
|
chart1 = {"id": 42, "uuid": "uuid-42", "slice_name": "Chart One"}
|
|
chart2 = {"id": 99, "uuid": "uuid-99", "slice_name": "Chart Two"}
|
|
|
|
with open(charts_dir / "chart1.yaml", 'w') as f:
|
|
yaml.dump(chart1, f)
|
|
with open(charts_dir / "chart2.yaml", 'w') as f:
|
|
yaml.dump(chart2, f)
|
|
|
|
result = engine._extract_chart_uuids_from_archive(Path(td))
|
|
|
|
assert result == {42: "uuid-42", 99: "uuid-99"}
|
|
|
|
|
|
# --- _transform_yaml tests ---
|
|
|
|
def test_transform_yaml_replaces_database_uuid():
|
|
"""Verifies that database_uuid in a dataset YAML is replaced."""
|
|
engine = MigrationEngine()
|
|
|
|
with tempfile.TemporaryDirectory() as td:
|
|
fp = Path(td) / "dataset.yaml"
|
|
with open(fp, 'w') as f:
|
|
yaml.dump({"database_uuid": "source-uuid-abc", "table_name": "my_table"}, f)
|
|
|
|
engine._transform_yaml(fp, {"source-uuid-abc": "target-uuid-xyz"})
|
|
|
|
with open(fp, 'r') as f:
|
|
data = yaml.safe_load(f)
|
|
assert data["database_uuid"] == "target-uuid-xyz"
|
|
assert data["table_name"] == "my_table"
|
|
|
|
|
|
def test_transform_yaml_ignores_unmapped_uuid():
|
|
"""Verifies no changes when UUID is not in the mapping."""
|
|
engine = MigrationEngine()
|
|
|
|
with tempfile.TemporaryDirectory() as td:
|
|
fp = Path(td) / "dataset.yaml"
|
|
original = {"database_uuid": "unknown-uuid", "table_name": "test"}
|
|
with open(fp, 'w') as f:
|
|
yaml.dump(original, f)
|
|
|
|
engine._transform_yaml(fp, {"other-uuid": "replacement"})
|
|
|
|
with open(fp, 'r') as f:
|
|
data = yaml.safe_load(f)
|
|
assert data["database_uuid"] == "unknown-uuid"
|
|
|
|
|
|
# --- [NEW] transform_zip E2E tests ---
|
|
|
|
def test_transform_zip_end_to_end():
|
|
"""Verifies full orchestration: extraction, transformation, patching, and re-packaging."""
|
|
mock_service = MockMappingService({"char-uuid": 101, "ds-uuid": 202})
|
|
engine = MigrationEngine(mock_service)
|
|
|
|
with tempfile.TemporaryDirectory() as td:
|
|
td_path = Path(td)
|
|
zip_path = td_path / "source.zip"
|
|
output_path = td_path / "target.zip"
|
|
|
|
# Create source ZIP structure
|
|
with tempfile.TemporaryDirectory() as src_dir:
|
|
src_path = Path(src_dir)
|
|
|
|
# 1. Dataset
|
|
ds_dir = src_path / "datasets"
|
|
ds_dir.mkdir()
|
|
with open(ds_dir / "ds.yaml", 'w') as f:
|
|
yaml.dump({"database_uuid": "source-db-uuid", "table_name": "users"}, f)
|
|
|
|
# 2. Chart
|
|
ch_dir = src_path / "charts"
|
|
ch_dir.mkdir()
|
|
with open(ch_dir / "ch.yaml", 'w') as f:
|
|
yaml.dump({"id": 10, "uuid": "char-uuid"}, f)
|
|
|
|
# 3. Dashboard
|
|
db_dir = src_path / "dashboards"
|
|
db_dir.mkdir()
|
|
metadata = {"native_filter_configuration": [{"targets": [{"chartId": 10}]}]}
|
|
with open(db_dir / "db.yaml", 'w') as f:
|
|
yaml.dump({"json_metadata": json.dumps(metadata)}, f)
|
|
|
|
with zipfile.ZipFile(zip_path, 'w') as zf:
|
|
for root, _, files in os.walk(src_path):
|
|
for file in files:
|
|
p = Path(root) / file
|
|
zf.write(p, p.relative_to(src_path))
|
|
|
|
db_mapping = {"source-db-uuid": "target-db-uuid"}
|
|
|
|
# Execute transform
|
|
success = engine.transform_zip(
|
|
str(zip_path),
|
|
str(output_path),
|
|
db_mapping,
|
|
target_env_id="test-target",
|
|
fix_cross_filters=True
|
|
)
|
|
|
|
assert success is True
|
|
assert output_path.exists()
|
|
|
|
# Verify contents
|
|
with tempfile.TemporaryDirectory() as out_dir:
|
|
with zipfile.ZipFile(output_path, 'r') as zf:
|
|
zf.extractall(out_dir)
|
|
|
|
out_path = Path(out_dir)
|
|
|
|
# Verify dataset transformation
|
|
with open(out_path / "datasets" / "ds.yaml", 'r') as f:
|
|
ds_data = yaml.safe_load(f)
|
|
assert ds_data["database_uuid"] == "target-db-uuid"
|
|
|
|
# Verify dashboard patching
|
|
with open(out_path / "dashboards" / "db.yaml", 'r') as f:
|
|
db_data = yaml.safe_load(f)
|
|
meta = json.loads(db_data["json_metadata"])
|
|
assert meta["native_filter_configuration"][0]["targets"][0]["chartId"] == 101
|
|
|
|
|
|
def test_transform_zip_invalid_path():
|
|
"""@PRE: Verify behavior (False) on invalid ZIP path."""
|
|
engine = MigrationEngine()
|
|
success = engine.transform_zip("non_existent.zip", "output.zip", {})
|
|
assert success is False
|
|
|
|
|
|
def test_transform_yaml_nonexistent_file():
|
|
"""@PRE: Verify behavior on non-existent YAML file."""
|
|
engine = MigrationEngine()
|
|
# Should log error and not crash (implemented via try-except if wrapped,
|
|
# but _transform_yaml itself might raise FileNotFoundError if not guarded)
|
|
with pytest.raises(FileNotFoundError):
|
|
engine._transform_yaml(Path("non_existent.yaml"), {})
|
|
|
|
|
|
# [/DEF:backend.tests.core.test_migration_engine:Module]
|