# [DEF:backend/src/plugins/llm_analysis/plugin.py:Module] # @TIER: STANDARD # @SEMANTICS: plugin, llm, analysis, documentation # @PURPOSE: Implements DashboardValidationPlugin and DocumentationPlugin. # @LAYER: Domain # @RELATION: INHERITS -> backend.src.core.plugin_base.PluginBase # @RELATION: CALLS -> backend.src.plugins.llm_analysis.service.ScreenshotService # @RELATION: CALLS -> backend.src.plugins.llm_analysis.service.LLMClient # @RELATION: CALLS -> backend.src.services.llm_provider.LLMProviderService # @RELATION: USES -> TaskContext # @INVARIANT: All LLM interactions must be executed as asynchronous tasks. from typing import Dict, Any, Optional import os import json from datetime import datetime, timedelta from ...core.plugin_base import PluginBase from ...core.logger import belief_scope, logger from ...core.database import SessionLocal from ...services.llm_provider import LLMProviderService from ...core.superset_client import SupersetClient from .service import ScreenshotService, LLMClient from .models import LLMProviderType, ValidationStatus, ValidationResult, DetectedIssue from ...models.llm import ValidationRecord from ...core.task_manager.context import TaskContext from ...services.llm_prompt_templates import ( DEFAULT_LLM_PROMPTS, is_multimodal_model, normalize_llm_settings, render_prompt, ) # [DEF:_is_masked_or_invalid_api_key:Function] # @PURPOSE: Guards against placeholder or malformed API keys in runtime. # @PRE: value may be None. # @POST: Returns True when value cannot be used for authenticated provider calls. def _is_masked_or_invalid_api_key(value: Optional[str]) -> bool: key = (value or "").strip() if not key: return True if key in {"********", "EMPTY_OR_NONE"}: return True # Most provider tokens are significantly longer; short values are almost always placeholders. return len(key) < 16 # [/DEF:_is_masked_or_invalid_api_key:Function] # [DEF:_json_safe_value:Function] # @PURPOSE: Recursively normalize payload values for JSON serialization. # @PRE: value may be nested dict/list with datetime values. # @POST: datetime values are converted to ISO strings. def _json_safe_value(value: Any): if isinstance(value, datetime): return value.isoformat() if isinstance(value, dict): return {k: _json_safe_value(v) for k, v in value.items()} if isinstance(value, list): return [_json_safe_value(v) for v in value] return value # [/DEF:_json_safe_value:Function] # [DEF:DashboardValidationPlugin:Class] # @PURPOSE: Plugin for automated dashboard health analysis using LLMs. # @RELATION: IMPLEMENTS -> backend.src.core.plugin_base.PluginBase class DashboardValidationPlugin(PluginBase): @property def id(self) -> str: return "llm_dashboard_validation" @property def name(self) -> str: return "Dashboard LLM Validation" @property def description(self) -> str: return "Automated dashboard health analysis using multimodal LLMs." @property def version(self) -> str: return "1.0.0" def get_schema(self) -> Dict[str, Any]: return { "type": "object", "properties": { "dashboard_id": {"type": "string", "title": "Dashboard ID"}, "environment_id": {"type": "string", "title": "Environment ID"}, "provider_id": {"type": "string", "title": "LLM Provider ID"} }, "required": ["dashboard_id", "environment_id", "provider_id"] } # [DEF:DashboardValidationPlugin.execute:Function] # @PURPOSE: Executes the dashboard validation task with TaskContext support. # @PARAM: params (Dict[str, Any]) - Validation parameters. # @PARAM: context (Optional[TaskContext]) - Task context for logging with source attribution. # @PRE: params contains dashboard_id, environment_id, and provider_id. # @POST: Returns a dictionary with validation results and persists them to the database. # @SIDE_EFFECT: Captures a screenshot, calls LLM API, and writes to the database. async def execute(self, params: Dict[str, Any], context: Optional[TaskContext] = None): with belief_scope("execute", f"plugin_id={self.id}"): validation_started_at = datetime.utcnow() # Use TaskContext logger if available, otherwise fall back to app logger log = context.logger if context else logger # Create sub-loggers for different components llm_log = log.with_source("llm") if context else log screenshot_log = log.with_source("screenshot") if context else log superset_log = log.with_source("superset_api") if context else log log.info(f"Executing {self.name} with params: {params}") dashboard_id_raw = params.get("dashboard_id") dashboard_id = str(dashboard_id_raw) if dashboard_id_raw is not None else None env_id = params.get("environment_id") provider_id = params.get("provider_id") db = SessionLocal() try: # 1. Get Environment from ...dependencies import get_config_manager config_mgr = get_config_manager() env = config_mgr.get_environment(env_id) if not env: log.error(f"Environment {env_id} not found") raise ValueError(f"Environment {env_id} not found") # 2. Get LLM Provider llm_service = LLMProviderService(db) db_provider = llm_service.get_provider(provider_id) if not db_provider: log.error(f"LLM Provider {provider_id} not found") raise ValueError(f"LLM Provider {provider_id} not found") llm_log.debug("Retrieved provider config:") llm_log.debug(f" Provider ID: {db_provider.id}") llm_log.debug(f" Provider Name: {db_provider.name}") llm_log.debug(f" Provider Type: {db_provider.provider_type}") llm_log.debug(f" Base URL: {db_provider.base_url}") llm_log.debug(f" Default Model: {db_provider.default_model}") llm_log.debug(f" Is Active: {db_provider.is_active}") if not is_multimodal_model(db_provider.default_model, db_provider.provider_type): raise ValueError( "Dashboard validation requires a multimodal model (image input support)." ) api_key = llm_service.get_decrypted_api_key(provider_id) llm_log.debug(f"API Key decrypted (first 8 chars): {api_key[:8] if api_key and len(api_key) > 8 else 'EMPTY_OR_NONE'}...") # Check if API key was successfully decrypted if _is_masked_or_invalid_api_key(api_key): raise ValueError( f"Invalid API key for provider {provider_id}. " "Please open LLM provider settings and save a real API key (not masked placeholder)." ) # 3. Capture Screenshot screenshot_service = ScreenshotService(env) storage_root = config_mgr.get_config().settings.storage.root_path screenshots_dir = os.path.join(storage_root, "screenshots") os.makedirs(screenshots_dir, exist_ok=True) filename = f"{dashboard_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png" screenshot_path = os.path.join(screenshots_dir, filename) screenshot_started_at = datetime.utcnow() screenshot_log.info(f"Capturing screenshot for dashboard {dashboard_id}") await screenshot_service.capture_dashboard(dashboard_id, screenshot_path) screenshot_log.debug(f"Screenshot saved to: {screenshot_path}") screenshot_finished_at = datetime.utcnow() # 4. Fetch Logs (from Environment /api/v1/log/) logs = [] logs_fetch_started_at = datetime.utcnow() try: client = SupersetClient(env) # Calculate time window (last 24 hours) start_time = (datetime.now() - timedelta(hours=24)).isoformat() # Construct filter for logs # Note: We filter by dashboard_id matching the object query_params = { "filters": [ {"col": "dashboard_id", "opr": "eq", "value": dashboard_id}, {"col": "dttm", "opr": "gt", "value": start_time} ], "order_column": "dttm", "order_direction": "desc", "page": 0, "page_size": 100 } superset_log.debug(f"Fetching logs for dashboard {dashboard_id}") response = client.network.request( method="GET", endpoint="/log/", params={"q": json.dumps(query_params)} ) if isinstance(response, dict) and "result" in response: for item in response["result"]: action = item.get("action", "unknown") dttm = item.get("dttm", "") details = item.get("json", "") logs.append(f"[{dttm}] {action}: {details}") if not logs: logs = ["No recent logs found for this dashboard."] superset_log.debug("No recent logs found for this dashboard") except Exception as e: superset_log.warning(f"Failed to fetch logs from environment: {e}") logs = [f"Error fetching remote logs: {str(e)}"] logs_fetch_finished_at = datetime.utcnow() # 5. Analyze with LLM llm_client = LLMClient( provider_type=LLMProviderType(db_provider.provider_type), api_key=api_key, base_url=db_provider.base_url, default_model=db_provider.default_model ) llm_log.info(f"Analyzing dashboard {dashboard_id} with LLM") llm_settings = normalize_llm_settings(config_mgr.get_config().settings.llm) dashboard_prompt = llm_settings["prompts"].get( "dashboard_validation_prompt", DEFAULT_LLM_PROMPTS["dashboard_validation_prompt"], ) llm_call_started_at = datetime.utcnow() analysis = await llm_client.analyze_dashboard( screenshot_path, logs, prompt_template=dashboard_prompt, ) llm_call_finished_at = datetime.utcnow() # Log analysis summary to task logs for better visibility llm_log.info(f"[ANALYSIS_SUMMARY] Status: {analysis['status']}") llm_log.info(f"[ANALYSIS_SUMMARY] Summary: {analysis['summary']}") if analysis.get("issues"): for i, issue in enumerate(analysis["issues"]): llm_log.info(f"[ANALYSIS_ISSUE][{i+1}] {issue.get('severity')}: {issue.get('message')} (Location: {issue.get('location', 'N/A')})") # 6. Persist Result validation_result = ValidationResult( dashboard_id=dashboard_id, status=ValidationStatus(analysis["status"]), summary=analysis["summary"], issues=[DetectedIssue(**issue) for issue in analysis["issues"]], screenshot_path=screenshot_path, raw_response=str(analysis) ) validation_finished_at = datetime.utcnow() result_payload = _json_safe_value(validation_result.dict()) result_payload["screenshot_paths"] = [screenshot_path] result_payload["logs_sent_to_llm"] = logs result_payload["logs_sent_count"] = len(logs) result_payload["prompt_template"] = dashboard_prompt result_payload["provider"] = { "id": db_provider.id, "name": db_provider.name, "type": db_provider.provider_type, "base_url": db_provider.base_url, "model": db_provider.default_model, } result_payload["environment_id"] = env_id result_payload["timings"] = { "validation_started_at": validation_started_at.isoformat(), "validation_finished_at": validation_finished_at.isoformat(), "validation_duration_ms": int((validation_finished_at - validation_started_at).total_seconds() * 1000), "screenshot_started_at": screenshot_started_at.isoformat(), "screenshot_finished_at": screenshot_finished_at.isoformat(), "screenshot_duration_ms": int((screenshot_finished_at - screenshot_started_at).total_seconds() * 1000), "logs_fetch_started_at": logs_fetch_started_at.isoformat(), "logs_fetch_finished_at": logs_fetch_finished_at.isoformat(), "logs_fetch_duration_ms": int((logs_fetch_finished_at - logs_fetch_started_at).total_seconds() * 1000), "llm_call_started_at": llm_call_started_at.isoformat(), "llm_call_finished_at": llm_call_finished_at.isoformat(), "llm_call_duration_ms": int((llm_call_finished_at - llm_call_started_at).total_seconds() * 1000), } db_record = ValidationRecord( dashboard_id=validation_result.dashboard_id, status=validation_result.status.value, summary=validation_result.summary, issues=[issue.dict() for issue in validation_result.issues], screenshot_path=validation_result.screenshot_path, raw_response=json.dumps(result_payload, ensure_ascii=False) ) db.add(db_record) db.commit() # 7. Notification on failure (US1 / FR-015) if validation_result.status == ValidationStatus.FAIL: log.warning(f"Dashboard {dashboard_id} validation FAILED. Summary: {validation_result.summary}") # Placeholder for Email/Pulse notification dispatch # In a real implementation, we would call a NotificationService here # with a payload containing the summary and a link to the report. # Final log to ensure all analysis is visible in task logs log.info(f"Validation completed for dashboard {dashboard_id}. Status: {validation_result.status.value}") return result_payload finally: db.close() # [/DEF:DashboardValidationPlugin.execute:Function] # [/DEF:DashboardValidationPlugin:Class] # [DEF:DocumentationPlugin:Class] # @PURPOSE: Plugin for automated dataset documentation using LLMs. # @RELATION: IMPLEMENTS -> backend.src.core.plugin_base.PluginBase class DocumentationPlugin(PluginBase): @property def id(self) -> str: return "llm_documentation" @property def name(self) -> str: return "Dataset LLM Documentation" @property def description(self) -> str: return "Automated dataset and column documentation using LLMs." @property def version(self) -> str: return "1.0.0" def get_schema(self) -> Dict[str, Any]: return { "type": "object", "properties": { "dataset_id": {"type": "string", "title": "Dataset ID"}, "environment_id": {"type": "string", "title": "Environment ID"}, "provider_id": {"type": "string", "title": "LLM Provider ID"} }, "required": ["dataset_id", "environment_id", "provider_id"] } # [DEF:DocumentationPlugin.execute:Function] # @PURPOSE: Executes the dataset documentation task with TaskContext support. # @PARAM: params (Dict[str, Any]) - Documentation parameters. # @PARAM: context (Optional[TaskContext]) - Task context for logging with source attribution. # @PRE: params contains dataset_id, environment_id, and provider_id. # @POST: Returns generated documentation and updates the dataset in Superset. # @SIDE_EFFECT: Calls LLM API and updates dataset metadata in Superset. async def execute(self, params: Dict[str, Any], context: Optional[TaskContext] = None): with belief_scope("execute", f"plugin_id={self.id}"): # Use TaskContext logger if available, otherwise fall back to app logger log = context.logger if context else logger # Create sub-loggers for different components llm_log = log.with_source("llm") if context else log superset_log = log.with_source("superset_api") if context else log log.info(f"Executing {self.name} with params: {params}") dataset_id = params.get("dataset_id") env_id = params.get("environment_id") provider_id = params.get("provider_id") db = SessionLocal() try: # 1. Get Environment from ...dependencies import get_config_manager config_mgr = get_config_manager() env = config_mgr.get_environment(env_id) if not env: log.error(f"Environment {env_id} not found") raise ValueError(f"Environment {env_id} not found") # 2. Get LLM Provider llm_service = LLMProviderService(db) db_provider = llm_service.get_provider(provider_id) if not db_provider: log.error(f"LLM Provider {provider_id} not found") raise ValueError(f"LLM Provider {provider_id} not found") llm_log.debug("Retrieved provider config:") llm_log.debug(f" Provider ID: {db_provider.id}") llm_log.debug(f" Provider Name: {db_provider.name}") llm_log.debug(f" Provider Type: {db_provider.provider_type}") llm_log.debug(f" Base URL: {db_provider.base_url}") llm_log.debug(f" Default Model: {db_provider.default_model}") api_key = llm_service.get_decrypted_api_key(provider_id) llm_log.debug(f"API Key decrypted (first 8 chars): {api_key[:8] if api_key and len(api_key) > 8 else 'EMPTY_OR_NONE'}...") # Check if API key was successfully decrypted if _is_masked_or_invalid_api_key(api_key): raise ValueError( f"Invalid API key for provider {provider_id}. " "Please open LLM provider settings and save a real API key (not masked placeholder)." ) # 3. Fetch Metadata (US2 / T024) from ...core.superset_client import SupersetClient client = SupersetClient(env) superset_log.debug(f"Fetching dataset {dataset_id}") dataset = client.get_dataset(int(dataset_id)) # Extract columns and existing descriptions columns_data = [] for col in dataset.get("columns", []): columns_data.append({ "name": col.get("column_name"), "type": col.get("type"), "description": col.get("description") }) superset_log.debug(f"Extracted {len(columns_data)} columns from dataset") # 4. Construct Prompt & Analyze (US2 / T025) llm_client = LLMClient( provider_type=LLMProviderType(db_provider.provider_type), api_key=api_key, base_url=db_provider.base_url, default_model=db_provider.default_model ) llm_settings = normalize_llm_settings(config_mgr.get_config().settings.llm) documentation_prompt = llm_settings["prompts"].get( "documentation_prompt", DEFAULT_LLM_PROMPTS["documentation_prompt"], ) prompt = render_prompt( documentation_prompt, { "dataset_name": dataset.get("table_name") or "", "columns_json": json.dumps(columns_data, ensure_ascii=False), }, ) # Using a generic chat completion for text-only US2 llm_log.info(f"Generating documentation for dataset {dataset_id}") doc_result = await llm_client.get_json_completion([{"role": "user", "content": prompt}]) # 5. Update Metadata (US2 / T026) update_payload = { "description": doc_result["dataset_description"], "columns": [] } # Map generated descriptions back to column IDs for col_doc in doc_result["column_descriptions"]: for col in dataset.get("columns", []): if col.get("column_name") == col_doc["name"]: update_payload["columns"].append({ "id": col.get("id"), "description": col_doc["description"] }) superset_log.info(f"Updating dataset {dataset_id} with generated documentation") client.update_dataset(int(dataset_id), update_payload) log.info(f"Documentation completed for dataset {dataset_id}") return doc_result finally: db.close() # [/DEF:DocumentationPlugin.execute:Function] # [/DEF:DocumentationPlugin:Class] # [/DEF:backend/src/plugins/llm_analysis/plugin.py:Module]