Improve dashboard LLM validation UX and report flow

This commit is contained in:
2026-02-26 17:53:41 +03:00
parent 5ec1254336
commit f4612c0737
10 changed files with 1199 additions and 30 deletions

View File

@@ -6,6 +6,7 @@
import pytest
from unittest.mock import MagicMock, patch, AsyncMock
from datetime import datetime, timezone
from fastapi.testclient import TestClient
from src.app import app
from src.api.routes.dashboards import DashboardsResponse
@@ -354,4 +355,84 @@ def test_get_database_mappings_success():
# [/DEF:test_get_database_mappings_success:Function]
# [DEF:test_get_dashboard_tasks_history_filters_success:Function]
# @TEST: GET /api/dashboards/{id}/tasks returns backup and llm tasks for dashboard
def test_get_dashboard_tasks_history_filters_success():
with patch("src.api.routes.dashboards.get_task_manager") as mock_task_mgr, \
patch("src.api.routes.dashboards.has_permission") as mock_perm:
now = datetime.now(timezone.utc)
llm_task = MagicMock()
llm_task.id = "task-llm-1"
llm_task.plugin_id = "llm_dashboard_validation"
llm_task.status = "SUCCESS"
llm_task.started_at = now
llm_task.finished_at = now
llm_task.params = {"dashboard_id": "42", "environment_id": "prod"}
llm_task.result = {"summary": "LLM validation complete"}
backup_task = MagicMock()
backup_task.id = "task-backup-1"
backup_task.plugin_id = "superset-backup"
backup_task.status = "RUNNING"
backup_task.started_at = now
backup_task.finished_at = None
backup_task.params = {"env": "prod", "dashboards": [42]}
backup_task.result = {}
other_task = MagicMock()
other_task.id = "task-other"
other_task.plugin_id = "superset-backup"
other_task.status = "SUCCESS"
other_task.started_at = now
other_task.finished_at = now
other_task.params = {"env": "prod", "dashboards": [777]}
other_task.result = {}
mock_task_mgr.return_value.get_all_tasks.return_value = [other_task, llm_task, backup_task]
mock_perm.return_value = lambda: True
response = client.get("/api/dashboards/42/tasks?env_id=prod&limit=10")
assert response.status_code == 200
data = response.json()
assert data["dashboard_id"] == 42
assert len(data["items"]) == 2
assert {item["plugin_id"] for item in data["items"]} == {"llm_dashboard_validation", "superset-backup"}
# [/DEF:test_get_dashboard_tasks_history_filters_success:Function]
# [DEF:test_get_dashboard_thumbnail_success:Function]
# @TEST: GET /api/dashboards/{id}/thumbnail proxies image bytes from Superset
def test_get_dashboard_thumbnail_success():
with patch("src.api.routes.dashboards.get_config_manager") as mock_config, \
patch("src.api.routes.dashboards.has_permission") as mock_perm, \
patch("src.api.routes.dashboards.SupersetClient") as mock_client_cls:
mock_env = MagicMock()
mock_env.id = "prod"
mock_config.return_value.get_environments.return_value = [mock_env]
mock_perm.return_value = lambda: True
mock_client = MagicMock()
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.content = b"fake-image-bytes"
mock_response.headers = {"Content-Type": "image/png"}
def _network_request(method, endpoint, **kwargs):
if method == "POST":
return {"image_url": "/api/v1/dashboard/42/screenshot/abc123/"}
return mock_response
mock_client.network.request.side_effect = _network_request
mock_client_cls.return_value = mock_client
response = client.get("/api/dashboards/42/thumbnail?env_id=prod")
assert response.status_code == 200
assert response.content == b"fake-image-bytes"
assert response.headers["content-type"].startswith("image/png")
# [/DEF:test_get_dashboard_thumbnail_success:Function]
# [/DEF:backend.src.api.routes.__tests__.test_dashboards:Module]

View File

@@ -11,12 +11,16 @@
# @INVARIANT: All dashboard responses include git_status and last_task metadata
# [SECTION: IMPORTS]
from fastapi import APIRouter, Depends, HTTPException
from typing import List, Optional, Dict
from fastapi import APIRouter, Depends, HTTPException, Query, Response
from fastapi.responses import JSONResponse
from typing import List, Optional, Dict, Any
import re
from urllib.parse import urlparse
from pydantic import BaseModel, Field
from ...dependencies import get_config_manager, get_task_manager, get_resource_service, get_mapping_service, has_permission
from ...core.logger import logger, belief_scope
from ...core.superset_client import SupersetClient
from ...core.utils.network import DashboardNotFoundError
# [/SECTION]
router = APIRouter(prefix="/api/dashboards", tags=["Dashboards"])
@@ -90,6 +94,24 @@ class DashboardDetailResponse(BaseModel):
dataset_count: int
# [/DEF:DashboardDetailResponse:DataClass]
# [DEF:DashboardTaskHistoryItem:DataClass]
class DashboardTaskHistoryItem(BaseModel):
id: str
plugin_id: str
status: str
validation_status: Optional[str] = None
started_at: Optional[str] = None
finished_at: Optional[str] = None
env_id: Optional[str] = None
summary: Optional[str] = None
# [/DEF:DashboardTaskHistoryItem:DataClass]
# [DEF:DashboardTaskHistoryResponse:DataClass]
class DashboardTaskHistoryResponse(BaseModel):
dashboard_id: int
items: List[DashboardTaskHistoryItem]
# [/DEF:DashboardTaskHistoryResponse:DataClass]
# [DEF:DatabaseMapping:DataClass]
class DatabaseMapping(BaseModel):
source_db: str
@@ -259,6 +281,190 @@ async def get_dashboard_detail(
raise HTTPException(status_code=503, detail=f"Failed to fetch dashboard detail: {str(e)}")
# [/DEF:get_dashboard_detail:Function]
# [DEF:_task_matches_dashboard:Function]
# @PURPOSE: Checks whether task params are tied to a specific dashboard and environment.
# @PRE: task-like object exposes plugin_id and params fields.
# @POST: Returns True only for supported task plugins tied to dashboard_id (+optional env_id).
def _task_matches_dashboard(task: Any, dashboard_id: int, env_id: Optional[str]) -> bool:
plugin_id = getattr(task, "plugin_id", None)
if plugin_id not in {"superset-backup", "llm_dashboard_validation"}:
return False
params = getattr(task, "params", {}) or {}
dashboard_id_str = str(dashboard_id)
if plugin_id == "llm_dashboard_validation":
task_dashboard_id = params.get("dashboard_id")
if str(task_dashboard_id) != dashboard_id_str:
return False
if env_id:
task_env = params.get("environment_id")
return str(task_env) == str(env_id)
return True
# superset-backup can pass dashboards as "dashboard_ids" or "dashboards"
dashboard_ids = params.get("dashboard_ids") or params.get("dashboards") or []
normalized_ids = {str(item) for item in dashboard_ids}
if dashboard_id_str not in normalized_ids:
return False
if env_id:
task_env = params.get("environment_id") or params.get("env")
return str(task_env) == str(env_id)
return True
# [/DEF:_task_matches_dashboard:Function]
# [DEF:get_dashboard_tasks_history:Function]
# @PURPOSE: Returns history of backup and LLM validation tasks for a dashboard.
# @PRE: dashboard_id is valid integer.
# @POST: Response contains sorted task history (newest first).
@router.get("/{dashboard_id:int}/tasks", response_model=DashboardTaskHistoryResponse)
async def get_dashboard_tasks_history(
dashboard_id: int,
env_id: Optional[str] = None,
limit: int = Query(20, ge=1, le=100),
task_manager=Depends(get_task_manager),
_ = Depends(has_permission("tasks", "READ"))
):
with belief_scope("get_dashboard_tasks_history", f"dashboard_id={dashboard_id}, env_id={env_id}, limit={limit}"):
matching_tasks = []
for task in task_manager.get_all_tasks():
if _task_matches_dashboard(task, dashboard_id, env_id):
matching_tasks.append(task)
def _sort_key(task_obj: Any) -> str:
return (
str(getattr(task_obj, "started_at", "") or "")
or str(getattr(task_obj, "finished_at", "") or "")
)
matching_tasks.sort(key=_sort_key, reverse=True)
selected = matching_tasks[:limit]
items = []
for task in selected:
result = getattr(task, "result", None)
summary = None
validation_status = None
if isinstance(result, dict):
raw_validation_status = result.get("status")
if raw_validation_status is not None:
validation_status = str(raw_validation_status)
summary = (
result.get("summary")
or result.get("status")
or result.get("message")
)
params = getattr(task, "params", {}) or {}
items.append(
DashboardTaskHistoryItem(
id=str(getattr(task, "id", "")),
plugin_id=str(getattr(task, "plugin_id", "")),
status=str(getattr(task, "status", "")),
validation_status=validation_status,
started_at=getattr(task, "started_at", None).isoformat() if getattr(task, "started_at", None) else None,
finished_at=getattr(task, "finished_at", None).isoformat() if getattr(task, "finished_at", None) else None,
env_id=str(params.get("environment_id") or params.get("env")) if (params.get("environment_id") or params.get("env")) else None,
summary=summary,
)
)
logger.info(f"[get_dashboard_tasks_history][Coherence:OK] Found {len(items)} tasks for dashboard {dashboard_id}")
return DashboardTaskHistoryResponse(dashboard_id=dashboard_id, items=items)
# [/DEF:get_dashboard_tasks_history:Function]
# [DEF:get_dashboard_thumbnail:Function]
# @PURPOSE: Proxies Superset dashboard thumbnail with cache support.
# @PRE: env_id must exist.
# @POST: Returns image bytes or 202 when thumbnail is being prepared by Superset.
@router.get("/{dashboard_id:int}/thumbnail")
async def get_dashboard_thumbnail(
dashboard_id: int,
env_id: str,
force: bool = Query(False),
config_manager=Depends(get_config_manager),
_ = Depends(has_permission("plugin:migration", "READ"))
):
with belief_scope("get_dashboard_thumbnail", f"dashboard_id={dashboard_id}, env_id={env_id}, force={force}"):
environments = config_manager.get_environments()
env = next((e for e in environments if e.id == env_id), None)
if not env:
logger.error(f"[get_dashboard_thumbnail][Coherence:Failed] Environment not found: {env_id}")
raise HTTPException(status_code=404, detail="Environment not found")
try:
client = SupersetClient(env)
digest = None
thumb_endpoint = None
# Preferred flow (newer Superset): ask server to cache screenshot and return digest/image_url.
try:
screenshot_payload = client.network.request(
method="POST",
endpoint=f"/dashboard/{dashboard_id}/cache_dashboard_screenshot/",
json={"force": force},
)
payload = screenshot_payload.get("result", screenshot_payload) if isinstance(screenshot_payload, dict) else {}
image_url = payload.get("image_url", "") if isinstance(payload, dict) else ""
if isinstance(image_url, str) and image_url:
matched = re.search(r"/dashboard/\d+/(?:thumbnail|screenshot)/([^/]+)/?$", image_url)
if matched:
digest = matched.group(1)
except DashboardNotFoundError:
logger.warning(
"[get_dashboard_thumbnail][Fallback] cache_dashboard_screenshot endpoint unavailable, fallback to dashboard.thumbnail_url"
)
# Fallback flow (older Superset): read thumbnail_url from dashboard payload.
if not digest:
dashboard_payload = client.network.request(
method="GET",
endpoint=f"/dashboard/{dashboard_id}",
)
dashboard_data = dashboard_payload.get("result", dashboard_payload) if isinstance(dashboard_payload, dict) else {}
thumbnail_url = dashboard_data.get("thumbnail_url", "") if isinstance(dashboard_data, dict) else ""
if isinstance(thumbnail_url, str) and thumbnail_url:
parsed = urlparse(thumbnail_url)
parsed_path = parsed.path or thumbnail_url
if parsed_path.startswith("/api/v1/"):
parsed_path = parsed_path[len("/api/v1"):]
thumb_endpoint = parsed_path
matched = re.search(r"/dashboard/\d+/(?:thumbnail|screenshot)/([^/]+)/?$", parsed_path)
if matched:
digest = matched.group(1)
if not thumb_endpoint:
thumb_endpoint = f"/dashboard/{dashboard_id}/thumbnail/{digest or 'latest'}/"
thumb_response = client.network.request(
method="GET",
endpoint=thumb_endpoint,
raw_response=True,
allow_redirects=True,
)
if thumb_response.status_code == 202:
payload_202: Dict[str, Any] = {}
try:
payload_202 = thumb_response.json()
except Exception:
payload_202 = {"message": "Thumbnail is being generated"}
return JSONResponse(status_code=202, content=payload_202)
content_type = thumb_response.headers.get("Content-Type", "image/png")
return Response(content=thumb_response.content, media_type=content_type)
except DashboardNotFoundError as e:
logger.error(f"[get_dashboard_thumbnail][Coherence:Failed] Dashboard not found for thumbnail: {e}")
raise HTTPException(status_code=404, detail="Dashboard thumbnail not found")
except HTTPException:
raise
except Exception as e:
logger.error(f"[get_dashboard_thumbnail][Coherence:Failed] Failed to fetch dashboard thumbnail: {e}")
raise HTTPException(status_code=503, detail=f"Failed to fetch dashboard thumbnail: {str(e)}")
# [/DEF:get_dashboard_thumbnail:Function]
# [DEF:MigrateRequest:DataClass]
class MigrateRequest(BaseModel):
source_env_id: str = Field(..., description="Source environment ID")

View File

@@ -5,7 +5,7 @@
# @LAYER: UI (API)
from fastapi import APIRouter, Depends, HTTPException, status
from typing import List
from typing import List, Optional
from ...core.logger import logger
from ...schemas.auth import User
from ...dependencies import get_current_user as get_current_active_user
@@ -19,6 +19,20 @@ from sqlalchemy.orm import Session
router = APIRouter(tags=["LLM"])
# [/DEF:router:Global]
# [DEF:_is_valid_runtime_api_key:Function]
# @PURPOSE: Validate decrypted runtime API key presence/shape.
# @PRE: value can be None.
# @POST: Returns True only for non-placeholder key.
def _is_valid_runtime_api_key(value: Optional[str]) -> bool:
key = (value or "").strip()
if not key:
return False
if key in {"********", "EMPTY_OR_NONE"}:
return False
return len(key) >= 16
# [/DEF:_is_valid_runtime_api_key:Function]
# [DEF:get_providers:Function]
# @PURPOSE: Retrieve all LLM provider configurations.
# @PRE: User is authenticated.
@@ -47,6 +61,37 @@ async def get_providers(
]
# [/DEF:get_providers:Function]
# [DEF:get_llm_status:Function]
# @PURPOSE: Returns whether LLM runtime is configured for dashboard validation.
# @PRE: User is authenticated.
# @POST: configured=true only when an active provider with valid decrypted key exists.
@router.get("/status")
async def get_llm_status(
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
service = LLMProviderService(db)
providers = service.get_all_providers()
active_provider = next((p for p in providers if p.is_active), None)
if not active_provider:
return {"configured": False, "reason": "no_active_provider"}
api_key = service.get_decrypted_api_key(active_provider.id)
if not _is_valid_runtime_api_key(api_key):
return {"configured": False, "reason": "invalid_api_key"}
return {
"configured": True,
"reason": "ok",
"provider_id": active_provider.id,
"provider_name": active_provider.name,
"provider_type": active_provider.provider_type,
"default_model": active_provider.default_model,
}
# [/DEF:get_llm_status:Function]
# [DEF:create_provider:Function]
# @PURPOSE: Create a new LLM provider configuration.
# @PRE: User is authenticated and has admin permissions.
@@ -204,4 +249,4 @@ async def test_provider_config(
return {"success": False, "error": str(e)}
# [/DEF:test_provider_config:Function]
# [/DEF:backend/src/api/routes/llm.py]
# [/DEF:backend/src/api/routes/llm.py]

View File

@@ -144,4 +144,46 @@ async def download_file(
raise HTTPException(status_code=400, detail=str(e))
# [/DEF:download_file:Function]
# [DEF:get_file_by_path:Function]
# @PURPOSE: Retrieve a file by validated absolute/relative path under storage root.
#
# @PRE: path must resolve under configured storage root.
# @POST: Returns a FileResponse for existing files.
#
# @PARAM: path (str) - Absolute or storage-root-relative file path.
# @RETURN: FileResponse - The file content.
#
# @RELATION: CALLS -> StoragePlugin.get_storage_root
# @RELATION: CALLS -> StoragePlugin.validate_path
@router.get("/file")
async def get_file_by_path(
path: str,
plugin_loader=Depends(get_plugin_loader),
_ = Depends(has_permission("plugin:storage", "READ"))
):
with belief_scope("get_file_by_path"):
storage_plugin: StoragePlugin = plugin_loader.get_plugin("storage-manager")
if not storage_plugin:
raise HTTPException(status_code=500, detail="Storage plugin not loaded")
requested_path = (path or "").strip()
if not requested_path:
raise HTTPException(status_code=400, detail="Path is required")
try:
candidate = Path(requested_path)
if candidate.is_absolute():
abs_path = storage_plugin.validate_path(candidate)
else:
storage_root = storage_plugin.get_storage_root()
abs_path = storage_plugin.validate_path(storage_root / candidate)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
if not abs_path.exists() or not abs_path.is_file():
raise HTTPException(status_code=404, detail="File not found")
return FileResponse(path=str(abs_path), filename=abs_path.name)
# [/DEF:get_file_by_path:Function]
# [/DEF:storage_routes:Module]

View File

@@ -30,6 +30,34 @@ from ...services.llm_prompt_templates import (
render_prompt,
)
# [DEF:_is_masked_or_invalid_api_key:Function]
# @PURPOSE: Guards against placeholder or malformed API keys in runtime.
# @PRE: value may be None.
# @POST: Returns True when value cannot be used for authenticated provider calls.
def _is_masked_or_invalid_api_key(value: Optional[str]) -> bool:
key = (value or "").strip()
if not key:
return True
if key in {"********", "EMPTY_OR_NONE"}:
return True
# Most provider tokens are significantly longer; short values are almost always placeholders.
return len(key) < 16
# [/DEF:_is_masked_or_invalid_api_key:Function]
# [DEF:_json_safe_value:Function]
# @PURPOSE: Recursively normalize payload values for JSON serialization.
# @PRE: value may be nested dict/list with datetime values.
# @POST: datetime values are converted to ISO strings.
def _json_safe_value(value: Any):
if isinstance(value, datetime):
return value.isoformat()
if isinstance(value, dict):
return {k: _json_safe_value(v) for k, v in value.items()}
if isinstance(value, list):
return [_json_safe_value(v) for v in value]
return value
# [/DEF:_json_safe_value:Function]
# [DEF:DashboardValidationPlugin:Class]
# @PURPOSE: Plugin for automated dashboard health analysis using LLMs.
# @RELATION: IMPLEMENTS -> backend.src.core.plugin_base.PluginBase
@@ -70,6 +98,7 @@ class DashboardValidationPlugin(PluginBase):
# @SIDE_EFFECT: Captures a screenshot, calls LLM API, and writes to the database.
async def execute(self, params: Dict[str, Any], context: Optional[TaskContext] = None):
with belief_scope("execute", f"plugin_id={self.id}"):
validation_started_at = datetime.utcnow()
# Use TaskContext logger if available, otherwise fall back to app logger
log = context.logger if context else logger
@@ -118,11 +147,10 @@ class DashboardValidationPlugin(PluginBase):
llm_log.debug(f"API Key decrypted (first 8 chars): {api_key[:8] if api_key and len(api_key) > 8 else 'EMPTY_OR_NONE'}...")
# Check if API key was successfully decrypted
if not api_key:
if _is_masked_or_invalid_api_key(api_key):
raise ValueError(
f"Failed to decrypt API key for provider {provider_id}. "
f"The provider may have been encrypted with a different encryption key. "
f"Please update the provider with a new API key through the UI."
f"Invalid API key for provider {provider_id}. "
"Please open LLM provider settings and save a real API key (not masked placeholder)."
)
# 3. Capture Screenshot
@@ -135,12 +163,15 @@ class DashboardValidationPlugin(PluginBase):
filename = f"{dashboard_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png"
screenshot_path = os.path.join(screenshots_dir, filename)
screenshot_started_at = datetime.utcnow()
screenshot_log.info(f"Capturing screenshot for dashboard {dashboard_id}")
await screenshot_service.capture_dashboard(dashboard_id, screenshot_path)
screenshot_log.debug(f"Screenshot saved to: {screenshot_path}")
screenshot_finished_at = datetime.utcnow()
# 4. Fetch Logs (from Environment /api/v1/log/)
logs = []
logs_fetch_started_at = datetime.utcnow()
try:
client = SupersetClient(env)
@@ -181,6 +212,7 @@ class DashboardValidationPlugin(PluginBase):
except Exception as e:
superset_log.warning(f"Failed to fetch logs from environment: {e}")
logs = [f"Error fetching remote logs: {str(e)}"]
logs_fetch_finished_at = datetime.utcnow()
# 5. Analyze with LLM
llm_client = LLMClient(
@@ -196,11 +228,13 @@ class DashboardValidationPlugin(PluginBase):
"dashboard_validation_prompt",
DEFAULT_LLM_PROMPTS["dashboard_validation_prompt"],
)
llm_call_started_at = datetime.utcnow()
analysis = await llm_client.analyze_dashboard(
screenshot_path,
logs,
prompt_template=dashboard_prompt,
)
llm_call_finished_at = datetime.utcnow()
# Log analysis summary to task logs for better visibility
llm_log.info(f"[ANALYSIS_SUMMARY] Status: {analysis['status']}")
@@ -218,6 +252,35 @@ class DashboardValidationPlugin(PluginBase):
screenshot_path=screenshot_path,
raw_response=str(analysis)
)
validation_finished_at = datetime.utcnow()
result_payload = _json_safe_value(validation_result.dict())
result_payload["screenshot_paths"] = [screenshot_path]
result_payload["logs_sent_to_llm"] = logs
result_payload["logs_sent_count"] = len(logs)
result_payload["prompt_template"] = dashboard_prompt
result_payload["provider"] = {
"id": db_provider.id,
"name": db_provider.name,
"type": db_provider.provider_type,
"base_url": db_provider.base_url,
"model": db_provider.default_model,
}
result_payload["environment_id"] = env_id
result_payload["timings"] = {
"validation_started_at": validation_started_at.isoformat(),
"validation_finished_at": validation_finished_at.isoformat(),
"validation_duration_ms": int((validation_finished_at - validation_started_at).total_seconds() * 1000),
"screenshot_started_at": screenshot_started_at.isoformat(),
"screenshot_finished_at": screenshot_finished_at.isoformat(),
"screenshot_duration_ms": int((screenshot_finished_at - screenshot_started_at).total_seconds() * 1000),
"logs_fetch_started_at": logs_fetch_started_at.isoformat(),
"logs_fetch_finished_at": logs_fetch_finished_at.isoformat(),
"logs_fetch_duration_ms": int((logs_fetch_finished_at - logs_fetch_started_at).total_seconds() * 1000),
"llm_call_started_at": llm_call_started_at.isoformat(),
"llm_call_finished_at": llm_call_finished_at.isoformat(),
"llm_call_duration_ms": int((llm_call_finished_at - llm_call_started_at).total_seconds() * 1000),
}
db_record = ValidationRecord(
dashboard_id=validation_result.dashboard_id,
@@ -225,7 +288,7 @@ class DashboardValidationPlugin(PluginBase):
summary=validation_result.summary,
issues=[issue.dict() for issue in validation_result.issues],
screenshot_path=validation_result.screenshot_path,
raw_response=validation_result.raw_response
raw_response=json.dumps(result_payload, ensure_ascii=False)
)
db.add(db_record)
db.commit()
@@ -240,7 +303,7 @@ class DashboardValidationPlugin(PluginBase):
# Final log to ensure all analysis is visible in task logs
log.info(f"Validation completed for dashboard {dashboard_id}. Status: {validation_result.status.value}")
return validation_result.dict()
return result_payload
finally:
db.close()
@@ -328,11 +391,10 @@ class DocumentationPlugin(PluginBase):
llm_log.debug(f"API Key decrypted (first 8 chars): {api_key[:8] if api_key and len(api_key) > 8 else 'EMPTY_OR_NONE'}...")
# Check if API key was successfully decrypted
if not api_key:
if _is_masked_or_invalid_api_key(api_key):
raise ValueError(
f"Failed to decrypt API key for provider {provider_id}. "
f"The provider may have been encrypted with a different encryption key. "
f"Please update the provider with a new API key through the UI."
f"Invalid API key for provider {provider_id}. "
"Please open LLM provider settings and save a real API key (not masked placeholder)."
)
# 3. Fetch Metadata (US2 / T024)