feat(llm-plugin): switch to environment API for log retrieval
- Replace local backend.log reading with Superset API /log/ fetch - Update DashboardValidationPlugin to use SupersetClient - Filter logs by dashboard_id and last 24 hours - Update spec FR-006 to reflect API usage
This commit is contained in:
@@ -4,21 +4,28 @@
|
||||
# @PURPOSE: Implements DashboardValidationPlugin and DocumentationPlugin.
|
||||
# @LAYER: Domain
|
||||
# @RELATION: INHERITS_FROM -> backend.src.core.plugin_base.PluginBase
|
||||
# @RELATION: CALLS -> backend.src.plugins.llm_analysis.service.ScreenshotService
|
||||
# @RELATION: CALLS -> backend.src.plugins.llm_analysis.service.LLMClient
|
||||
# @RELATION: CALLS -> backend.src.services.llm_provider.LLMProviderService
|
||||
# @INVARIANT: All LLM interactions must be executed as asynchronous tasks.
|
||||
|
||||
from typing import Dict, Any, Optional, List
|
||||
import os
|
||||
import json
|
||||
from datetime import datetime, timedelta
|
||||
from ...core.plugin_base import PluginBase
|
||||
from ...core.logger import belief_scope, logger
|
||||
from ...core.database import SessionLocal
|
||||
from ...core.config_manager import ConfigManager
|
||||
from ...services.llm_provider import LLMProviderService
|
||||
from ...core.superset_client import SupersetClient
|
||||
from .service import ScreenshotService, LLMClient
|
||||
from .models import LLMProviderType, ValidationStatus, ValidationResult, DetectedIssue
|
||||
from ...models.llm import ValidationRecord
|
||||
|
||||
# [DEF:DashboardValidationPlugin:Class]
|
||||
# @PURPOSE: Plugin for automated dashboard health analysis using LLMs.
|
||||
# @RELATION: IMPLEMENTS -> backend.src.core.plugin_base.PluginBase
|
||||
class DashboardValidationPlugin(PluginBase):
|
||||
@property
|
||||
def id(self) -> str:
|
||||
@@ -56,6 +63,16 @@ class DashboardValidationPlugin(PluginBase):
|
||||
provider_id = params.get("provider_id")
|
||||
task_id = params.get("_task_id")
|
||||
|
||||
# Helper to log to both app logger and task manager logs
|
||||
def task_log(level: str, message: str, context: Optional[Dict] = None):
|
||||
logger.log(getattr(logging, level.upper()), message)
|
||||
if task_id:
|
||||
from ...dependencies import get_task_manager
|
||||
try:
|
||||
tm = get_task_manager()
|
||||
tm._add_log(task_id, level.upper(), message, context)
|
||||
except: pass
|
||||
|
||||
db = SessionLocal()
|
||||
try:
|
||||
# 1. Get Environment
|
||||
@@ -80,17 +97,46 @@ class DashboardValidationPlugin(PluginBase):
|
||||
|
||||
await screenshot_service.capture_dashboard(dashboard_id, screenshot_path)
|
||||
|
||||
# 4. Fetch Logs (Last 100 lines from backend.log)
|
||||
# 4. Fetch Logs (from Environment /api/v1/log/)
|
||||
logs = []
|
||||
log_file = "backend.log"
|
||||
if os.path.exists(log_file):
|
||||
with open(log_file, "r") as f:
|
||||
# Read last 100 lines
|
||||
all_lines = f.readlines()
|
||||
logs = all_lines[-100:]
|
||||
|
||||
if not logs:
|
||||
logs = ["No logs found in backend.log"]
|
||||
try:
|
||||
client = SupersetClient(env)
|
||||
|
||||
# Calculate time window (last 24 hours)
|
||||
start_time = (datetime.now() - timedelta(hours=24)).isoformat()
|
||||
|
||||
# Construct filter for logs
|
||||
# Note: We filter by dashboard_id matching the object
|
||||
query_params = {
|
||||
"filters": [
|
||||
{"col": "dashboard_id", "op": "eq", "value": dashboard_id},
|
||||
{"col": "dttm", "op": "gt", "value": start_time}
|
||||
],
|
||||
"order_column": "dttm",
|
||||
"order_direction": "desc",
|
||||
"page": 0,
|
||||
"page_size": 100
|
||||
}
|
||||
|
||||
response = client.network.request(
|
||||
method="GET",
|
||||
endpoint="/log/",
|
||||
params={"q": json.dumps(query_params)}
|
||||
)
|
||||
|
||||
if isinstance(response, dict) and "result" in response:
|
||||
for item in response["result"]:
|
||||
action = item.get("action", "unknown")
|
||||
dttm = item.get("dttm", "")
|
||||
details = item.get("json", "")
|
||||
logs.append(f"[{dttm}] {action}: {details}")
|
||||
|
||||
if not logs:
|
||||
logs = ["No recent logs found for this dashboard."]
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to fetch logs from environment: {e}")
|
||||
logs = [f"Error fetching remote logs: {str(e)}"]
|
||||
|
||||
# 5. Analyze with LLM
|
||||
llm_client = LLMClient(
|
||||
@@ -101,6 +147,13 @@ class DashboardValidationPlugin(PluginBase):
|
||||
)
|
||||
|
||||
analysis = await llm_client.analyze_dashboard(screenshot_path, logs)
|
||||
|
||||
# Log analysis summary to task logs for better visibility
|
||||
logger.info(f"[ANALYSIS_SUMMARY] Status: {analysis['status']}")
|
||||
logger.info(f"[ANALYSIS_SUMMARY] Summary: {analysis['summary']}")
|
||||
if analysis.get("issues"):
|
||||
for i, issue in enumerate(analysis["issues"]):
|
||||
logger.info(f"[ANALYSIS_ISSUE][{i+1}] {issue.get('severity')}: {issue.get('message')} (Location: {issue.get('location', 'N/A')})")
|
||||
|
||||
# 6. Persist Result
|
||||
validation_result = ValidationResult(
|
||||
@@ -130,6 +183,9 @@ class DashboardValidationPlugin(PluginBase):
|
||||
# In a real implementation, we would call a NotificationService here
|
||||
# with a payload containing the summary and a link to the report.
|
||||
|
||||
# Final log to ensure all analysis is visible in task logs
|
||||
task_log("INFO", f"Validation completed for dashboard {dashboard_id}. Status: {validation_result.status.value}")
|
||||
|
||||
return validation_result.dict()
|
||||
|
||||
finally:
|
||||
@@ -138,6 +194,7 @@ class DashboardValidationPlugin(PluginBase):
|
||||
|
||||
# [DEF:DocumentationPlugin:Class]
|
||||
# @PURPOSE: Plugin for automated dataset documentation using LLMs.
|
||||
# @RELATION: IMPLEMENTS -> backend.src.core.plugin_base.PluginBase
|
||||
class DocumentationPlugin(PluginBase):
|
||||
@property
|
||||
def id(self) -> str:
|
||||
@@ -166,6 +223,16 @@ class DocumentationPlugin(PluginBase):
|
||||
"required": ["dataset_id", "environment_id", "provider_id"]
|
||||
}
|
||||
|
||||
# [DEF:execute:Function]
|
||||
# @PURPOSE: Executes the dashboard validation task.
|
||||
# @PRE: params contains dashboard_id, environment_id, and provider_id.
|
||||
# @POST: Returns a dictionary with validation results and persists them to the database.
|
||||
# @SIDE_EFFECT: Captures a screenshot, calls LLM API, and writes to the database.
|
||||
# [DEF:execute:Function]
|
||||
# @PURPOSE: Executes the dataset documentation task.
|
||||
# @PRE: params contains dataset_id, environment_id, and provider_id.
|
||||
# @POST: Returns generated documentation and updates the dataset in Superset.
|
||||
# @SIDE_EFFECT: Calls LLM API and updates dataset metadata in Superset.
|
||||
async def execute(self, params: Dict[str, Any]):
|
||||
with belief_scope("execute", f"plugin_id={self.id}"):
|
||||
logger.info(f"Executing {self.name} with params: {params}")
|
||||
@@ -235,14 +302,8 @@ class DocumentationPlugin(PluginBase):
|
||||
"""
|
||||
|
||||
# Using a generic chat completion for text-only US2
|
||||
response = await llm_client.client.chat.completions.create(
|
||||
model=db_provider.default_model,
|
||||
messages=[{"role": "user", "content": prompt}],
|
||||
response_format={"type": "json_object"}
|
||||
)
|
||||
|
||||
import json
|
||||
doc_result = json.loads(response.choices[0].message.content)
|
||||
# We use the shared get_json_completion method from LLMClient
|
||||
doc_result = await llm_client.get_json_completion([{"role": "user", "content": prompt}])
|
||||
|
||||
# 5. Update Metadata (US2 / T026)
|
||||
# This part normally goes to mapping_service, but we implement the logic here for the plugin flow
|
||||
|
||||
Reference in New Issue
Block a user