# [DEF:backend.src.plugins.llm_analysis.service:Module] # @TIER: STANDARD # @SEMANTICS: service, llm, screenshot, playwright, openai # @PURPOSE: Services for LLM interaction and dashboard screenshots. # @LAYER: Domain # @RELATION: DEPENDS_ON -> playwright # @RELATION: DEPENDS_ON -> openai # @RELATION: DEPENDS_ON -> tenacity import asyncio from typing import List, Optional, Dict, Any from playwright.async_api import async_playwright from openai import AsyncOpenAI, RateLimitError from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type from .models import LLMProviderType, ValidationResult, ValidationStatus, DetectedIssue from ...core.logger import belief_scope, logger from ...core.config_models import Environment # [DEF:ScreenshotService:Class] # @PURPOSE: Handles capturing screenshots of Superset dashboards. class ScreenshotService: # @PRE: env is a valid Environment object. def __init__(self, env: Environment): self.env = env # [DEF:capture_dashboard:Function] # @PURPOSE: Captures a screenshot of a dashboard using Playwright. # @PARAM: dashboard_id (str) - ID of the dashboard. # @PARAM: output_path (str) - Path to save the screenshot. # @RETURN: bool - True if successful. async def capture_dashboard(self, dashboard_id: str, output_path: str) -> bool: with belief_scope("capture_dashboard", f"dashboard_id={dashboard_id}"): logger.info(f"Capturing screenshot for dashboard {dashboard_id}") async with async_playwright() as p: browser = await p.chromium.launch(headless=True) context = await browser.new_context(viewport={'width': 1280, 'height': 720}) page = await context.new_page() # 1. Authenticate via API to get tokens from ...core.superset_client import SupersetClient client = SupersetClient(self.env) try: tokens = client.authenticate() access_token = tokens.get("access_token") # Set JWT in localStorage if possible, or use as cookie # Superset UI uses session cookies, but we can try to set the Authorization header # or inject the token into the session. # For now, we'll use the token to set a cookie if we can determine the name, # but the most reliable way for Playwright is often still the UI login # UNLESS we use the API to set a session cookie. logger.info("API Authentication successful") except Exception as e: logger.warning(f"API Authentication failed: {e}. Falling back to UI login.") # 2. Navigate to dashboard dashboard_url = f"{self.env.url}/superset/dashboard/{dashboard_id}/" logger.info(f"Navigating to {dashboard_url}") # We still go to the URL first await page.goto(dashboard_url) await page.wait_for_load_state("networkidle") # 3. Check if we are redirected to login if "/login" in page.url: logger.info(f"Redirected to login: {page.url}. Filling credentials from Environment.") # More exhaustive list of selectors for various Superset versions/themes selectors = { "username": ['input[name="username"]', 'input#username', 'input[placeholder*="Username"]'], "password": ['input[name="password"]', 'input#password', 'input[placeholder*="Password"]'], "submit": ['button[type="submit"]', 'button#submit', '.btn-primary'] } try: # Find and fill username u_selector = None for s in selectors["username"]: if await page.locator(s).count() > 0: u_selector = s break if not u_selector: raise RuntimeError("Could not find username input field") await page.fill(u_selector, self.env.username) # Find and fill password p_selector = None for s in selectors["password"]: if await page.locator(s).count() > 0: p_selector = s break if not p_selector: raise RuntimeError("Could not find password input field") await page.fill(p_selector, self.env.password) # Click submit s_selector = selectors["submit"][0] for s in selectors["submit"]: if await page.locator(s).count() > 0: s_selector = s break await page.click(s_selector) await page.wait_for_load_state("networkidle") # Re-verify we are at the dashboard if "/login" in page.url: # Check for error messages on page error_msg = await page.locator(".alert-danger, .error-message").text_content() if await page.locator(".alert-danger, .error-message").count() > 0 else "Unknown error" raise RuntimeError(f"Login failed after submission: {error_msg}") if "/superset/dashboard" not in page.url: logger.info(f"Redirecting back to dashboard after login: {dashboard_url}") await page.goto(dashboard_url) await page.wait_for_load_state("networkidle") except Exception as e: page_title = await page.title() logger.error(f"UI Login failed. Page title: {page_title}, URL: {page.url}, Error: {str(e)}") debug_path = output_path.replace(".png", "_debug_failed_login.png") await page.screenshot(path=debug_path) raise RuntimeError(f"Login failed: {str(e)}. Debug screenshot saved to {debug_path}") # Wait a bit more for charts to render await asyncio.sleep(5) await page.screenshot(path=output_path, full_page=True) await browser.close() logger.info(f"Screenshot saved to {output_path}") return True # [/DEF:ScreenshotService:Class] # [DEF:LLMClient:Class] # @PURPOSE: Wrapper for LLM provider APIs. class LLMClient: def __init__(self, provider_type: LLMProviderType, api_key: str, base_url: str, default_model: str): self.provider_type = provider_type self.api_key = api_key self.base_url = base_url self.default_model = default_model self.client = AsyncOpenAI(api_key=api_key, base_url=base_url) # [DEF:analyze_dashboard:Function] # @PURPOSE: Sends dashboard data to LLM for analysis. @retry( stop=stop_after_attempt(5), wait=wait_exponential(multiplier=2, min=5, max=60), retry=retry_if_exception_type((Exception, RateLimitError)) ) async def analyze_dashboard(self, screenshot_path: str, logs: List[str]) -> Dict[str, Any]: with belief_scope("analyze_dashboard"): import base64 with open(screenshot_path, "rb") as image_file: base64_image = base64.b64encode(image_file.read()).decode('utf-8') log_text = "\n".join(logs) prompt = f""" Analyze the attached dashboard screenshot and the following execution logs for health and visual issues. Logs: {log_text} Provide the analysis in JSON format with the following structure: {{ "status": "PASS" | "WARN" | "FAIL", "summary": "Short summary of findings", "issues": [ {{ "severity": "WARN" | "FAIL", "message": "Description of the issue", "location": "Optional location info (e.g. chart name)" }} ] }} """ logger.debug(f"[analyze_dashboard] Calling LLM with model: {self.default_model}") try: response = await self.client.chat.completions.create( model=self.default_model, messages=[ { "role": "user", "content": [ {"type": "text", "text": prompt}, { "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{base64_image}" } } ] } ], response_format={"type": "json_object"} ) logger.debug(f"[analyze_dashboard] LLM Response: {response}") except RateLimitError as e: logger.warning(f"[analyze_dashboard] Rate limit hit: {str(e)}") raise # tenacity will handle retry except Exception as e: logger.error(f"[analyze_dashboard] LLM call failed: {str(e)}") raise if not response or not hasattr(response, 'choices') or not response.choices: error_info = getattr(response, 'error', 'No choices in response') logger.error(f"[analyze_dashboard] Invalid LLM response. Error info: {error_info}") return { "status": "FAIL", "summary": f"Failed to get response from LLM: {error_info}", "issues": [{"severity": "FAIL", "message": "LLM provider returned empty or invalid response"}] } import json result = json.loads(response.choices[0].message.content) return result # [/DEF:analyze_dashboard:Function] # [/DEF:LLMClient:Class] # [/DEF:backend.src.plugins.llm_analysis.service:Module]