Files
ss-tools/backend/src/plugins/llm_analysis/service.py
2026-02-07 11:26:06 +03:00

630 lines
35 KiB
Python

# [DEF:backend/src/plugins/llm_analysis/service.py:Module]
# @TIER: STANDARD
# @SEMANTICS: service, llm, screenshot, playwright, openai
# @PURPOSE: Services for LLM interaction and dashboard screenshots.
# @LAYER: Domain
# @RELATION: DEPENDS_ON -> playwright
# @RELATION: DEPENDS_ON -> openai
# @RELATION: DEPENDS_ON -> tenacity
# @INVARIANT: Screenshots must be 1920px width and capture full page height.
import asyncio
import base64
import json
import io
from typing import List, Optional, Dict, Any
from PIL import Image
from playwright.async_api import async_playwright
from openai import AsyncOpenAI, RateLimitError, AuthenticationError as OpenAIAuthenticationError
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception
from .models import LLMProviderType, ValidationResult, ValidationStatus, DetectedIssue
from ...core.logger import belief_scope, logger
from ...core.config_models import Environment
# [DEF:ScreenshotService:Class]
# @PURPOSE: Handles capturing screenshots of Superset dashboards.
class ScreenshotService:
# [DEF:ScreenshotService.__init__:Function]
# @PURPOSE: Initializes the ScreenshotService with environment configuration.
# @PRE: env is a valid Environment object.
def __init__(self, env: Environment):
self.env = env
# [/DEF:ScreenshotService.__init__:Function]
# [DEF:ScreenshotService.capture_dashboard:Function]
# @PURPOSE: Captures a full-page screenshot of a dashboard using Playwright and CDP.
# @PRE: dashboard_id is a valid string, output_path is a writable path.
# @POST: Returns True if screenshot is saved successfully.
# @SIDE_EFFECT: Launches a browser, performs UI login, switches tabs, and writes a PNG file.
# @UX_STATE: [Navigating] -> Loading dashboard UI
# @UX_STATE: [TabSwitching] -> Iterating through dashboard tabs to trigger lazy loading
# @UX_STATE: [CalculatingHeight] -> Determining dashboard dimensions
# @UX_STATE: [Capturing] -> Executing CDP screenshot
async def capture_dashboard(self, dashboard_id: str, output_path: str) -> bool:
with belief_scope("capture_dashboard", f"dashboard_id={dashboard_id}"):
logger.info(f"Capturing screenshot for dashboard {dashboard_id}")
async with async_playwright() as p:
browser = await p.chromium.launch(
headless=True,
args=[
"--disable-blink-features=AutomationControlled",
"--disable-infobars",
"--no-sandbox"
]
)
# Set a realistic user agent to avoid 403 Forbidden from OpenResty/WAF
user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
# Construct base UI URL from environment (strip /api/v1 suffix)
base_ui_url = self.env.url.rstrip("/")
if base_ui_url.endswith("/api/v1"):
base_ui_url = base_ui_url[:-len("/api/v1")]
# Create browser context with realistic headers
context = await browser.new_context(
viewport={'width': 1280, 'height': 720},
user_agent=user_agent,
extra_http_headers={
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"Accept-Language": "ru-RU,ru;q=0.9,en-US;q=0.8,en;q=0.7",
"Upgrade-Insecure-Requests": "1",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "none",
"Sec-Fetch-User": "?1"
}
)
logger.info("Browser context created successfully")
page = await context.new_page()
# Bypass navigator.webdriver detection
await page.add_init_script("delete Object.getPrototypeOf(navigator).webdriver")
# 1. Navigate to login page and authenticate
login_url = f"{base_ui_url.rstrip('/')}/login/"
logger.info(f"[DEBUG] Navigating to login page: {login_url}")
response = await page.goto(login_url, wait_until="networkidle", timeout=60000)
if response:
logger.info(f"[DEBUG] Login page response status: {response.status}")
# Wait for login form to be ready
await page.wait_for_load_state("domcontentloaded")
# More exhaustive list of selectors for various Superset versions/themes
selectors = {
"username": ['input[name="username"]', 'input#username', 'input[placeholder*="Username"]', 'input[type="text"]'],
"password": ['input[name="password"]', 'input#password', 'input[placeholder*="Password"]', 'input[type="password"]'],
"submit": ['button[type="submit"]', 'button#submit', '.btn-primary', 'input[type="submit"]']
}
logger.info(f"[DEBUG] Attempting to find login form elements...")
try:
# Find and fill username
u_selector = None
for s in selectors["username"]:
count = await page.locator(s).count()
logger.info(f"[DEBUG] Selector '{s}': {count} elements found")
if count > 0:
u_selector = s
break
if not u_selector:
# Log all input fields on the page for debugging
all_inputs = await page.locator('input').all()
logger.info(f"[DEBUG] Found {len(all_inputs)} input fields on page")
for i, inp in enumerate(all_inputs[:5]): # Log first 5
inp_type = await inp.get_attribute('type')
inp_name = await inp.get_attribute('name')
inp_id = await inp.get_attribute('id')
logger.info(f"[DEBUG] Input {i}: type={inp_type}, name={inp_name}, id={inp_id}")
raise RuntimeError("Could not find username input field on login page")
logger.info(f"[DEBUG] Filling username field with selector: {u_selector}")
await page.fill(u_selector, self.env.username)
# Find and fill password
p_selector = None
for s in selectors["password"]:
if await page.locator(s).count() > 0:
p_selector = s
break
if not p_selector:
raise RuntimeError("Could not find password input field on login page")
logger.info(f"[DEBUG] Filling password field with selector: {p_selector}")
await page.fill(p_selector, self.env.password)
# Click submit
s_selector = selectors["submit"][0]
for s in selectors["submit"]:
if await page.locator(s).count() > 0:
s_selector = s
break
logger.info(f"[DEBUG] Clicking submit button with selector: {s_selector}")
await page.click(s_selector)
# Wait for navigation after login
await page.wait_for_load_state("networkidle", timeout=30000)
# Check if login was successful
if "/login" in page.url:
# Check for error messages on page
error_msg = await page.locator(".alert-danger, .error-message").text_content() if await page.locator(".alert-danger, .error-message").count() > 0 else "Unknown error"
logger.error(f"[DEBUG] Login failed. Still on login page. Error: {error_msg}")
debug_path = output_path.replace(".png", "_debug_failed_login.png")
await page.screenshot(path=debug_path)
raise RuntimeError(f"Login failed: {error_msg}. Debug screenshot saved to {debug_path}")
logger.info(f"[DEBUG] Login successful. Current URL: {page.url}")
# Check cookies after successful login
page_cookies = await context.cookies()
logger.info(f"[DEBUG] Cookies after login: {len(page_cookies)}")
for c in page_cookies:
logger.info(f"[DEBUG] Cookie: name={c['name']}, domain={c['domain']}, value={c.get('value', '')[:20]}...")
except Exception as e:
page_title = await page.title()
logger.error(f"UI Login failed. Page title: {page_title}, URL: {page.url}, Error: {str(e)}")
debug_path = output_path.replace(".png", "_debug_failed_login.png")
await page.screenshot(path=debug_path)
raise RuntimeError(f"Login failed: {str(e)}. Debug screenshot saved to {debug_path}")
# 2. Navigate to dashboard
# @UX_STATE: [Navigating] -> Loading dashboard UI
dashboard_url = f"{base_ui_url.rstrip('/')}/superset/dashboard/{dashboard_id}/?standalone=true"
if base_ui_url.startswith("https://") and dashboard_url.startswith("http://"):
dashboard_url = dashboard_url.replace("http://", "https://")
logger.info(f"[DEBUG] Navigating to dashboard: {dashboard_url}")
# Use networkidle to ensure all initial assets are loaded
response = await page.goto(dashboard_url, wait_until="networkidle", timeout=60000)
if response:
logger.info(f"[DEBUG] Dashboard navigation response status: {response.status}, URL: {response.url}")
try:
# Wait for the dashboard grid to be present
await page.wait_for_selector('.dashboard-component, .dashboard-header, [data-test="dashboard-grid"]', timeout=30000)
logger.info(f"[DEBUG] Dashboard container loaded")
# Wait for charts to finish loading (Superset uses loading spinners/skeletons)
# We wait until loading indicators disappear or a timeout occurs
try:
# Wait for loading indicators to disappear
await page.wait_for_selector('.loading, .ant-skeleton, .spinner', state="hidden", timeout=60000)
logger.info(f"[DEBUG] Loading indicators hidden")
except:
logger.warning(f"[DEBUG] Timeout waiting for loading indicators to hide")
# Wait for charts to actually render their content (e.g., ECharts, NVD3)
# We look for common chart containers that should have content
try:
await page.wait_for_selector('.chart-container canvas, .slice_container svg, .superset-chart-canvas, .grid-content .chart-container', timeout=60000)
logger.info(f"[DEBUG] Chart content detected")
except:
logger.warning(f"[DEBUG] Timeout waiting for chart content")
# Additional check: wait for all chart containers to have non-empty content
logger.info(f"[DEBUG] Waiting for all charts to have rendered content...")
await page.wait_for_function("""() => {
const charts = document.querySelectorAll('.chart-container, .slice_container');
if (charts.length === 0) return true; // No charts to wait for
// Check if all charts have rendered content (canvas, svg, or non-empty div)
return Array.from(charts).every(chart => {
const hasCanvas = chart.querySelector('canvas') !== null;
const hasSvg = chart.querySelector('svg') !== null;
const hasContent = chart.innerText.trim().length > 0 || chart.children.length > 0;
return hasCanvas || hasSvg || hasContent;
});
}""", timeout=60000)
logger.info(f"[DEBUG] All charts have rendered content")
# Scroll to bottom and back to top to trigger lazy loading of all charts
logger.info(f"[DEBUG] Scrolling to trigger lazy loading...")
await page.evaluate("""async () => {
const delay = ms => new Promise(resolve => setTimeout(resolve, ms));
for (let i = 0; i < document.body.scrollHeight; i += 500) {
window.scrollTo(0, i);
await delay(100);
}
window.scrollTo(0, 0);
await delay(500);
}""")
except Exception as e:
logger.warning(f"[DEBUG] Dashboard content wait failed: {e}, proceeding anyway after delay")
# Final stabilization delay - increased for complex dashboards
logger.info(f"[DEBUG] Final stabilization delay...")
await asyncio.sleep(15)
# Logic to handle tabs and full-page capture
try:
# 1. Handle Tabs (Recursive switching)
# @UX_STATE: [TabSwitching] -> Iterating through dashboard tabs to trigger lazy loading
processed_tabs = set()
async def switch_tabs(depth=0):
if depth > 3: return # Limit recursion depth
tab_selectors = [
'.ant-tabs-nav-list .ant-tabs-tab',
'.dashboard-component-tabs .ant-tabs-tab',
'[data-test="dashboard-component-tabs"] .ant-tabs-tab'
]
found_tabs = []
for selector in tab_selectors:
found_tabs = await page.locator(selector).all()
if found_tabs: break
if found_tabs:
logger.info(f"[DEBUG][TabSwitching] Found {len(found_tabs)} tabs at depth {depth}")
for i, tab in enumerate(found_tabs):
try:
tab_text = (await tab.inner_text()).strip()
tab_id = f"{depth}_{i}_{tab_text}"
if tab_id in processed_tabs:
continue
if await tab.is_visible():
logger.info(f"[DEBUG][TabSwitching] Switching to tab: {tab_text}")
processed_tabs.add(tab_id)
is_active = "ant-tabs-tab-active" in (await tab.get_attribute("class") or "")
if not is_active:
await tab.click()
await asyncio.sleep(2) # Wait for content to render
await switch_tabs(depth + 1)
except Exception as tab_e:
logger.warning(f"[DEBUG][TabSwitching] Failed to process tab {i}: {tab_e}")
try:
first_tab = found_tabs[0]
if "ant-tabs-tab-active" not in (await first_tab.get_attribute("class") or ""):
await first_tab.click()
await asyncio.sleep(1)
except: pass
await switch_tabs()
# 2. Calculate full height for screenshot
# @UX_STATE: [CalculatingHeight] -> Determining dashboard dimensions
full_height = await page.evaluate("""() => {
const body = document.body;
const html = document.documentElement;
const dashboardContent = document.querySelector('.dashboard-content');
return Math.max(
body.scrollHeight, body.offsetHeight,
html.clientHeight, html.scrollHeight, html.offsetHeight,
dashboardContent ? dashboardContent.scrollHeight + 100 : 0
);
}""")
logger.info(f"[DEBUG] Calculated full height: {full_height}")
# DIAGNOSTIC: Count chart elements before resize
chart_count_before = await page.evaluate("""() => {
return {
chartContainers: document.querySelectorAll('.chart-container, .slice_container').length,
canvasElements: document.querySelectorAll('canvas').length,
svgElements: document.querySelectorAll('.chart-container svg, .slice_container svg').length,
visibleCharts: document.querySelectorAll('.chart-container:visible, .slice_container:visible').length
};
}""")
logger.info(f"[DIAGNOSTIC] Chart elements BEFORE viewport resize: {chart_count_before}")
# DIAGNOSTIC: Capture pre-resize screenshot for comparison
pre_resize_path = output_path.replace(".png", "_preresize.png")
try:
await page.screenshot(path=pre_resize_path, full_page=False, timeout=10000)
import os
pre_resize_size = os.path.getsize(pre_resize_path) if os.path.exists(pre_resize_path) else 0
logger.info(f"[DIAGNOSTIC] Pre-resize screenshot saved: {pre_resize_path} ({pre_resize_size} bytes)")
except Exception as pre_e:
logger.warning(f"[DIAGNOSTIC] Failed to capture pre-resize screenshot: {pre_e}")
logger.info(f"[DIAGNOSTIC] Resizing viewport from current to 1920x{int(full_height)}")
await page.set_viewport_size({"width": 1920, "height": int(full_height)})
# DIAGNOSTIC: Increased wait time and log timing
logger.info("[DIAGNOSTIC] Waiting 10 seconds after viewport resize for re-render...")
await asyncio.sleep(10)
logger.info("[DIAGNOSTIC] Wait completed")
# DIAGNOSTIC: Count chart elements after resize and wait
chart_count_after = await page.evaluate("""() => {
return {
chartContainers: document.querySelectorAll('.chart-container, .slice_container').length,
canvasElements: document.querySelectorAll('canvas').length,
svgElements: document.querySelectorAll('.chart-container svg, .slice_container svg').length,
visibleCharts: document.querySelectorAll('.chart-container:visible, .slice_container:visible').length
};
}""")
logger.info(f"[DIAGNOSTIC] Chart elements AFTER viewport resize + wait: {chart_count_after}")
# DIAGNOSTIC: Check if any charts have error states
chart_errors = await page.evaluate("""() => {
const errors = [];
document.querySelectorAll('.chart-container, .slice_container').forEach((chart, i) => {
const errorEl = chart.querySelector('.error, .alert-danger, .ant-alert-error');
if (errorEl) {
errors.push({index: i, text: errorEl.innerText.substring(0, 100)});
}
});
return errors;
}""")
if chart_errors:
logger.warning(f"[DIAGNOSTIC] Charts with error states detected: {chart_errors}")
else:
logger.info("[DIAGNOSTIC] No chart error states detected")
# 3. Take screenshot using CDP to bypass Playwright's font loading wait
# @UX_STATE: [Capturing] -> Executing CDP screenshot
logger.info("[DEBUG] Attempting full-page screenshot via CDP...")
cdp = await page.context.new_cdp_session(page)
screenshot_data = await cdp.send("Page.captureScreenshot", {
"format": "png",
"fromSurface": True,
"captureBeyondViewport": True
})
image_data = base64.b64decode(screenshot_data["data"])
with open(output_path, 'wb') as f:
f.write(image_data)
# DIAGNOSTIC: Verify screenshot file
import os
final_size = os.path.getsize(output_path) if os.path.exists(output_path) else 0
logger.info(f"[DIAGNOSTIC] Final screenshot saved: {output_path}")
logger.info(f"[DIAGNOSTIC] Final screenshot size: {final_size} bytes ({final_size / 1024:.2f} KB)")
# DIAGNOSTIC: Get image dimensions
try:
with Image.open(output_path) as final_img:
logger.info(f"[DIAGNOSTIC] Final screenshot dimensions: {final_img.width}x{final_img.height}")
except Exception as img_err:
logger.warning(f"[DIAGNOSTIC] Could not read final image dimensions: {img_err}")
logger.info(f"Full-page screenshot saved to {output_path} (via CDP)")
except Exception as e:
logger.error(f"[DEBUG] Full-page/Tab capture failed: {e}")
try:
await page.screenshot(path=output_path, full_page=True, timeout=10000)
except Exception as e2:
logger.error(f"[DEBUG] Fallback screenshot also failed: {e2}")
await page.screenshot(path=output_path, timeout=5000)
await browser.close()
return True
# [/DEF:ScreenshotService.capture_dashboard:Function]
# [/DEF:ScreenshotService:Class]
# [DEF:LLMClient:Class]
# @PURPOSE: Wrapper for LLM provider APIs.
class LLMClient:
# [DEF:LLMClient.__init__:Function]
# @PURPOSE: Initializes the LLMClient with provider settings.
# @PRE: api_key, base_url, and default_model are non-empty strings.
def __init__(self, provider_type: LLMProviderType, api_key: str, base_url: str, default_model: str):
self.provider_type = provider_type
self.api_key = api_key
self.base_url = base_url
self.default_model = default_model
# DEBUG: Log initialization parameters (without exposing full API key)
logger.info(f"[LLMClient.__init__] Initializing LLM client:")
logger.info(f"[LLMClient.__init__] Provider Type: {provider_type}")
logger.info(f"[LLMClient.__init__] Base URL: {base_url}")
logger.info(f"[LLMClient.__init__] Default Model: {default_model}")
logger.info(f"[LLMClient.__init__] API Key (first 8 chars): {api_key[:8] if api_key and len(api_key) > 8 else 'EMPTY_OR_NONE'}...")
logger.info(f"[LLMClient.__init__] API Key Length: {len(api_key) if api_key else 0}")
self.client = AsyncOpenAI(api_key=api_key, base_url=base_url)
# [/DEF:LLMClient.__init__:Function]
# [DEF:LLMClient.get_json_completion:Function]
# @PURPOSE: Helper to handle LLM calls with JSON mode and fallback parsing.
# @PRE: messages is a list of valid message dictionaries.
# @POST: Returns a parsed JSON dictionary.
# @SIDE_EFFECT: Calls external LLM API.
def _should_retry(exception: Exception) -> bool:
"""Custom retry predicate that excludes authentication errors."""
# Don't retry on authentication errors
if isinstance(exception, OpenAIAuthenticationError):
return False
# Retry on rate limit errors and other exceptions
return isinstance(exception, (RateLimitError, Exception))
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=2, min=5, max=60),
retry=retry_if_exception(_should_retry),
reraise=True
)
async def get_json_completion(self, messages: List[Dict[str, Any]]) -> Dict[str, Any]:
with belief_scope("get_json_completion"):
response = None
try:
try:
logger.info(f"[get_json_completion] Attempting LLM call with JSON mode for model: {self.default_model}")
logger.info(f"[get_json_completion] Base URL being used: {self.base_url}")
logger.info(f"[get_json_completion] Number of messages: {len(messages)}")
logger.info(f"[get_json_completion] API Key present: {bool(self.api_key and len(self.api_key) > 0)}")
response = await self.client.chat.completions.create(
model=self.default_model,
messages=messages,
response_format={"type": "json_object"}
)
except Exception as e:
if "JSON mode is not enabled" in str(e) or "400" in str(e):
logger.warning(f"[get_json_completion] JSON mode failed or not supported: {str(e)}. Falling back to plain text response.")
response = await self.client.chat.completions.create(
model=self.default_model,
messages=messages
)
else:
raise e
logger.debug(f"[get_json_completion] LLM Response: {response}")
except OpenAIAuthenticationError as e:
logger.error(f"[get_json_completion] Authentication error: {str(e)}")
# Do not retry on auth errors - re-raise to stop retry
raise
except RateLimitError as e:
logger.warning(f"[get_json_completion] Rate limit hit: {str(e)}")
# Extract retry_delay from error metadata if available
retry_delay = 5.0 # Default fallback
try:
# Based on logs, the raw response is in e.body or e.response.json()
# The logs show 'metadata': {'raw': '...'} which suggests a proxy or specific client wrapper
# Let's try to find the 'retryDelay' in the error message or response
import re
# Try to find "retryDelay": "XXs" in the string representation of the error
error_str = str(e)
match = re.search(r'"retryDelay":\s*"(\d+)s"', error_str)
if match:
retry_delay = float(match.group(1))
else:
# Try to parse from response if it's a standard OpenAI-like error with body
if hasattr(e, 'body') and isinstance(e.body, dict):
# Some providers put it in details
details = e.body.get('error', {}).get('details', [])
for detail in details:
if detail.get('@type') == 'type.googleapis.com/google.rpc.RetryInfo':
delay_str = detail.get('retryDelay', '5s')
retry_delay = float(delay_str.rstrip('s'))
break
except Exception as parse_e:
logger.debug(f"[get_json_completion] Failed to parse retry delay: {parse_e}")
# Add a small safety margin (0.5s) as requested
wait_time = retry_delay + 0.5
logger.info(f"[get_json_completion] Waiting for {wait_time}s before retry...")
await asyncio.sleep(wait_time)
raise
except Exception as e:
logger.error(f"[get_json_completion] LLM call failed: {str(e)}")
raise
if not response or not hasattr(response, 'choices') or not response.choices:
raise RuntimeError(f"Invalid LLM response: {response}")
content = response.choices[0].message.content
logger.debug(f"[get_json_completion] Raw content to parse: {content}")
try:
return json.loads(content)
except json.JSONDecodeError:
logger.warning("[get_json_completion] Failed to parse JSON directly, attempting to extract from code blocks")
if "```json" in content:
json_str = content.split("```json")[1].split("```")[0].strip()
return json.loads(json_str)
elif "```" in content:
json_str = content.split("```")[1].split("```")[0].strip()
return json.loads(json_str)
else:
raise
# [/DEF:LLMClient.get_json_completion:Function]
# [DEF:LLMClient.analyze_dashboard:Function]
# @PURPOSE: Sends dashboard data (screenshot + logs) to LLM for health analysis.
# @PRE: screenshot_path exists, logs is a list of strings.
# @POST: Returns a structured analysis dictionary (status, summary, issues).
# @SIDE_EFFECT: Reads screenshot file and calls external LLM API.
async def analyze_dashboard(self, screenshot_path: str, logs: List[str]) -> Dict[str, Any]:
with belief_scope("analyze_dashboard"):
# Optimize image to reduce token count (US1 / T023)
# Gemini/Gemma models have limits on input tokens, and large images contribute significantly.
try:
with Image.open(screenshot_path) as img:
# Convert to RGB if necessary
if img.mode in ("RGBA", "P"):
img = img.convert("RGB")
# Resize if too large (max 1024px width while maintaining aspect ratio)
# We reduce width further to 1024px to stay within token limits for long dashboards
max_width = 1024
if img.width > max_width or img.height > 2048:
# Calculate scaling factor to fit within 1024x2048
scale = min(max_width / img.width, 2048 / img.height)
if scale < 1.0:
new_width = int(img.width * scale)
new_height = int(img.height * scale)
img = img.resize((new_width, new_height), Image.Resampling.LANCZOS)
logger.info(f"[analyze_dashboard] Resized image from {img.width}x{img.height} to {new_width}x{new_height}")
# Compress and convert to base64
buffer = io.BytesIO()
# Lower quality to 60% to further reduce payload size
img.save(buffer, format="JPEG", quality=60, optimize=True)
base_64_image = base64.b64encode(buffer.getvalue()).decode('utf-8')
logger.info(f"[analyze_dashboard] Optimized image size: {len(buffer.getvalue()) / 1024:.2f} KB")
except Exception as img_e:
logger.warning(f"[analyze_dashboard] Image optimization failed: {img_e}. Using raw image.")
with open(screenshot_path, "rb") as image_file:
base_64_image = base64.b64encode(image_file.read()).decode('utf-8')
log_text = "\n".join(logs)
prompt = f"""
Analyze the attached dashboard screenshot and the following execution logs for health and visual issues.
Logs:
{log_text}
Provide the analysis in JSON format with the following structure:
{{
"status": "PASS" | "WARN" | "FAIL",
"summary": "Short summary of findings",
"issues": [
{{
"severity": "WARN" | "FAIL",
"message": "Description of the issue",
"location": "Optional location info (e.g. chart name)"
}}
]
}}
"""
messages = [
{
"role": "user",
"content": [
{"type": "text", "text": prompt},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{base_64_image}"
}
}
]
}
]
try:
return await self.get_json_completion(messages)
except Exception as e:
logger.error(f"[analyze_dashboard] Failed to get analysis: {str(e)}")
return {
"status": "FAIL",
"summary": f"Failed to get response from LLM: {str(e)}",
"issues": [{"severity": "FAIL", "message": "LLM provider returned empty or invalid response"}]
}
# [/DEF:LLMClient.analyze_dashboard:Function]
# [/DEF:LLMClient:Class]
# [/DEF:backend/src/plugins/llm_analysis/service.py:Module]