{ "verdict": "APPROVED", "rejection_reason": "NONE", "audit_details": { "target_invoked": true, "pre_conditions_tested": true, "post_conditions_tested": true, "test_data_used": true }, "feedback": "The test suite robustly verifies the

MigrationEngine
 contracts. It avoids Tautologies by cleanly substituting IdMappingService without mocking the engine itself. Cross-filter parsing asserts against hard-coded, predefined validation dictionaries (no Logic Mirroring). It successfully addresses @PRE negative cases (e.g. invalid zip paths, missing YAMLs) and rigorously validates @POST file transformations (e.g. in-place UUID substitutions and archive reconstruction)." }
This commit is contained in:
2026-02-25 17:47:55 +03:00
parent 590ba49ddb
commit 99f19ac305
20 changed files with 1211 additions and 308 deletions

View File

@@ -7,7 +7,7 @@
# @RELATION: DEPENDS_ON -> backend.src.models.dashboard
from fastapi import APIRouter, Depends, HTTPException, Query
from typing import List, Dict, Any
from typing import List, Dict, Any, Optional
from sqlalchemy.orm import Session
from ...dependencies import get_config_manager, get_task_manager, has_permission
from ...core.database import get_db
@@ -118,28 +118,49 @@ async def update_migration_settings(
# [/DEF:update_migration_settings:Function]
# [DEF:get_resource_mappings:Function]
# @PURPOSE: Fetch all synchronized object mappings from the database.
@router.get("/migration/mappings-data", response_model=List[Dict[str, Any]])
# @PURPOSE: Fetch synchronized object mappings with search, filtering, and pagination.
@router.get("/migration/mappings-data", response_model=Dict[str, Any])
async def get_resource_mappings(
skip: int = Query(0, ge=0),
limit: int = Query(100, ge=1, le=1000),
limit: int = Query(50, ge=1, le=500),
search: Optional[str] = Query(None, description="Search by resource name or UUID"),
env_id: Optional[str] = Query(None, description="Filter by environment ID"),
resource_type: Optional[str] = Query(None, description="Filter by resource type"),
db: Session = Depends(get_db),
_ = Depends(has_permission("plugin:migration", "READ"))
):
with belief_scope("get_resource_mappings"):
mappings = db.query(ResourceMapping).offset(skip).limit(limit).all()
result = []
query = db.query(ResourceMapping)
if env_id:
query = query.filter(ResourceMapping.environment_id == env_id)
if resource_type:
query = query.filter(ResourceMapping.resource_type == resource_type.upper())
if search:
search_term = f"%{search}%"
query = query.filter(
(ResourceMapping.resource_name.ilike(search_term)) |
(ResourceMapping.uuid.ilike(search_term))
)
total = query.count()
mappings = query.order_by(ResourceMapping.resource_type, ResourceMapping.resource_name).offset(skip).limit(limit).all()
items = []
for m in mappings:
result.append({
items.append({
"id": m.id,
"environment_id": m.environment_id,
"resource_type": m.resource_type.value,
"resource_type": m.resource_type.value if m.resource_type else None,
"uuid": m.uuid,
"remote_id": m.remote_integer_id,
"resource_name": m.resource_name,
"last_synced_at": m.last_synced_at.isoformat() if m.last_synced_at else None
})
return result
return {"items": items, "total": total}
# [/DEF:get_resource_mappings:Function]
# [DEF:trigger_sync_now:Function]