@@ -9,6 +9,7 @@
from __future__ import annotations
import json
import re
import uuid
from datetime import datetime , timedelta
@@ -26,6 +27,9 @@ from ...core.config_manager import ConfigManager
from . . . core . database import get_db
from . . . services . git_service import GitService
from . . . services . llm_provider import LLMProviderService
from . . . core . superset_client import SupersetClient
from . . . plugins . llm_analysis . service import LLMClient
from . . . plugins . llm_analysis . models import LLMProviderType
from . . . schemas . auth import User
from . . . models . assistant import (
AssistantAuditRecord ,
@@ -100,6 +104,17 @@ USER_ACTIVE_CONVERSATION: Dict[str, str] = {}
CONFIRMATIONS : Dict [ str , ConfirmationRecord ] = { }
ASSISTANT_AUDIT : Dict [ str , List [ Dict [ str , Any ] ] ] = { }
INTENT_PERMISSION_CHECKS : Dict [ str , List [ Tuple [ str , str ] ] ] = {
" get_task_status " : [ ( " tasks " , " READ " ) ] ,
" create_branch " : [ ( " plugin:git " , " EXECUTE " ) ] ,
" commit_changes " : [ ( " plugin:git " , " EXECUTE " ) ] ,
" deploy_dashboard " : [ ( " plugin:git " , " EXECUTE " ) ] ,
" execute_migration " : [ ( " plugin:migration " , " EXECUTE " ) , ( " plugin:superset-migration " , " EXECUTE " ) ] ,
" run_backup " : [ ( " plugin:superset-backup " , " EXECUTE " ) , ( " plugin:backup " , " EXECUTE " ) ] ,
" run_llm_validation " : [ ( " plugin:llm_dashboard_validation " , " EXECUTE " ) ] ,
" run_llm_documentation " : [ ( " plugin:llm_documentation " , " EXECUTE " ) ] ,
}
# [DEF:_append_history:Function]
# @PURPOSE: Append conversation message to in-memory history buffer.
@@ -387,6 +402,69 @@ def _resolve_provider_id(provider_token: Optional[str], db: Session) -> Optional
# [/DEF:_resolve_provider_id:Function]
# [DEF:_get_default_environment_id:Function]
# @PURPOSE: Resolve default environment id from settings or first configured environment.
# @PRE: config_manager returns environments list.
# @POST: Returns default environment id or None when environment list is empty.
def _get_default_environment_id ( config_manager : ConfigManager ) - > Optional [ str ] :
configured = config_manager . get_environments ( )
if not configured :
return None
preferred = None
if hasattr ( config_manager , " get_config " ) :
try :
preferred = config_manager . get_config ( ) . settings . default_environment_id
except Exception :
preferred = None
if preferred and any ( env . id == preferred for env in configured ) :
return preferred
explicit_default = next ( ( env . id for env in configured if getattr ( env , " is_default " , False ) ) , None )
return explicit_default or configured [ 0 ] . id
# [/DEF:_get_default_environment_id:Function]
# [DEF:_resolve_dashboard_id_by_ref:Function]
# @PURPOSE: Resolve dashboard id by title or slug reference in selected environment.
# @PRE: dashboard_ref is a non-empty string-like token.
# @POST: Returns dashboard id when uniquely matched, otherwise None.
def _resolve_dashboard_id_by_ref (
dashboard_ref : Optional [ str ] ,
env_id : Optional [ str ] ,
config_manager : ConfigManager ,
) - > Optional [ int ] :
if not dashboard_ref or not env_id :
return None
env = next ( ( item for item in config_manager . get_environments ( ) if item . id == env_id ) , None )
if not env :
return None
needle = dashboard_ref . strip ( ) . lower ( )
try :
client = SupersetClient ( env )
_ , dashboards = client . get_dashboards ( query = { " page_size " : 200 } )
except Exception as exc :
logger . warning ( f " [assistant.dashboard_resolve][failed] ref= { dashboard_ref } env= { env_id } error= { exc } " )
return None
exact = next (
(
d for d in dashboards
if str ( d . get ( " slug " , " " ) ) . lower ( ) == needle
or str ( d . get ( " dashboard_title " , " " ) ) . lower ( ) == needle
or str ( d . get ( " title " , " " ) ) . lower ( ) == needle
) ,
None ,
)
if exact :
return int ( exact . get ( " id " ) )
partial = [ d for d in dashboards if needle in str ( d . get ( " dashboard_title " , d . get ( " title " , " " ) ) ) . lower ( ) ]
if len ( partial ) == 1 and partial [ 0 ] . get ( " id " ) is not None :
return int ( partial [ 0 ] [ " id " ] )
return None
# [/DEF:_resolve_dashboard_id_by_ref:Function]
# [DEF:_parse_command:Function]
# @PURPOSE: Deterministically parse RU/EN command text into intent payload.
# @PRE: message contains raw user text and config manager resolves environments.
@@ -396,6 +474,10 @@ def _parse_command(message: str, config_manager: ConfigManager) -> Dict[str, Any
lower = text . lower ( )
dashboard_id = _extract_id ( lower , [ r " (?:дашборд \ w*|dashboard) \ s*(?:id \ s*)?( \ d+) " ] )
dashboard_ref = _extract_id (
lower ,
[ r " (?:дашборд \ w*|dashboard) \ s*(?:id \ s*)?([a-zа -я0-9._-]+) " ] ,
)
dataset_id = _extract_id ( lower , [ r " (?:датасет \ w*|dataset) \ s*(?:id \ s*)?( \ d+) " ] )
# Accept short and long task ids (e.g., task-1, task-abc123, UUIDs).
task_id = _extract_id ( lower , [ r " (task[-_a-z0-9] { 1,}|[0-9a-f] {8} -[0-9a-f-] { 27,}) " ] )
@@ -500,6 +582,7 @@ def _parse_command(message: str, config_manager: ConfigManager) -> Dict[str, Any
" operation " : " run_llm_validation " ,
" entities " : {
" dashboard_id " : int ( dashboard_id ) if dashboard_id else None ,
" dashboard_ref " : dashboard_ref if ( dashboard_ref and not dashboard_ref . isdigit ( ) ) else None ,
" environment " : env_match ,
" provider " : provider_match ,
} ,
@@ -553,24 +636,272 @@ def _check_any_permission(current_user: User, checks: List[Tuple[str, str]]):
# [/DEF:_check_any_permission:Function]
# [DEF:_has_any_permission:Function]
# @PURPOSE: Check whether user has at least one permission tuple from the provided list.
# @PRE: current_user and checks list are valid.
# @POST: Returns True when at least one permission check passes.
def _has_any_permission ( current_user : User , checks : List [ Tuple [ str , str ] ] ) - > bool :
try :
_check_any_permission ( current_user , checks )
return True
except HTTPException :
return False
# [/DEF:_has_any_permission:Function]
# [DEF:_build_tool_catalog:Function]
# @PURPOSE: Build current-user tool catalog for LLM planner with operation contracts and defaults.
# @PRE: current_user is authenticated; config/db are available.
# @POST: Returns list of executable tools filtered by permission and runtime availability.
def _build_tool_catalog ( current_user : User , config_manager : ConfigManager , db : Session ) - > List [ Dict [ str , Any ] ] :
envs = config_manager . get_environments ( )
default_env_id = _get_default_environment_id ( config_manager )
providers = LLMProviderService ( db ) . get_all_providers ( )
active_provider = next ( ( p . id for p in providers if p . is_active ) , None )
fallback_provider = active_provider or ( providers [ 0 ] . id if providers else None )
candidates : List [ Dict [ str , Any ] ] = [
{
" operation " : " get_task_status " ,
" domain " : " status " ,
" description " : " Get task status by task_id or latest user task " ,
" required_entities " : [ ] ,
" optional_entities " : [ " task_id " ] ,
" risk_level " : " safe " ,
" requires_confirmation " : False ,
} ,
{
" operation " : " create_branch " ,
" domain " : " git " ,
" description " : " Create git branch for dashboard " ,
" required_entities " : [ " dashboard_id " , " branch_name " ] ,
" optional_entities " : [ ] ,
" risk_level " : " guarded " ,
" requires_confirmation " : False ,
} ,
{
" operation " : " commit_changes " ,
" domain " : " git " ,
" description " : " Commit dashboard repository changes " ,
" required_entities " : [ " dashboard_id " ] ,
" optional_entities " : [ " message " ] ,
" risk_level " : " guarded " ,
" requires_confirmation " : False ,
} ,
{
" operation " : " deploy_dashboard " ,
" domain " : " git " ,
" description " : " Deploy dashboard to target environment " ,
" required_entities " : [ " dashboard_id " , " environment " ] ,
" optional_entities " : [ ] ,
" risk_level " : " guarded " ,
" requires_confirmation " : False ,
} ,
{
" operation " : " execute_migration " ,
" domain " : " migration " ,
" description " : " Run dashboard migration between environments " ,
" required_entities " : [ " dashboard_id " , " source_env " , " target_env " ] ,
" optional_entities " : [ ] ,
" risk_level " : " guarded " ,
" requires_confirmation " : False ,
} ,
{
" operation " : " run_backup " ,
" domain " : " backup " ,
" description " : " Run backup for environment or specific dashboard " ,
" required_entities " : [ " environment " ] ,
" optional_entities " : [ " dashboard_id " ] ,
" risk_level " : " guarded " ,
" requires_confirmation " : False ,
} ,
{
" operation " : " run_llm_validation " ,
" domain " : " llm " ,
" description " : " Run LLM dashboard validation " ,
" required_entities " : [ " dashboard_id " ] ,
" optional_entities " : [ " dashboard_ref " , " environment " , " provider " ] ,
" defaults " : { " environment " : default_env_id , " provider " : fallback_provider } ,
" risk_level " : " guarded " ,
" requires_confirmation " : False ,
} ,
{
" operation " : " run_llm_documentation " ,
" domain " : " llm " ,
" description " : " Generate dataset documentation via LLM " ,
" required_entities " : [ " dataset_id " ] ,
" optional_entities " : [ " environment " , " provider " ] ,
" defaults " : { " environment " : default_env_id , " provider " : fallback_provider } ,
" risk_level " : " guarded " ,
" requires_confirmation " : False ,
} ,
]
available : List [ Dict [ str , Any ] ] = [ ]
for tool in candidates :
checks = INTENT_PERMISSION_CHECKS . get ( tool [ " operation " ] , [ ] )
if checks and not _has_any_permission ( current_user , checks ) :
continue
available . append ( tool )
return available
# [/DEF:_build_tool_catalog:Function]
# [DEF:_coerce_intent_entities:Function]
# @PURPOSE: Normalize intent entity value types from LLM output to route-compatible values.
# @PRE: intent contains entities dict or missing entities.
# @POST: Returned intent has numeric ids coerced where possible and string values stripped.
def _coerce_intent_entities ( intent : Dict [ str , Any ] ) - > Dict [ str , Any ] :
entities = intent . get ( " entities " )
if not isinstance ( entities , dict ) :
intent [ " entities " ] = { }
entities = intent [ " entities " ]
for key in ( " dashboard_id " , " dataset_id " ) :
value = entities . get ( key )
if isinstance ( value , str ) and value . strip ( ) . isdigit ( ) :
entities [ key ] = int ( value . strip ( ) )
for key , value in list ( entities . items ( ) ) :
if isinstance ( value , str ) :
entities [ key ] = value . strip ( )
return intent
# [/DEF:_coerce_intent_entities:Function]
# [DEF:_clarification_text_for_intent:Function]
# @PURPOSE: Convert technical missing-parameter errors into user-facing clarification prompts.
# @PRE: state was classified as needs_clarification for current intent/error combination.
# @POST: Returned text is human-readable and actionable for target operation.
def _clarification_text_for_intent ( intent : Optional [ Dict [ str , Any ] ] , detail_text : str ) - > str :
operation = ( intent or { } ) . get ( " operation " )
guidance_by_operation : Dict [ str , str ] = {
" run_llm_validation " : (
" Нужно уточнение для запуска LLM-валидации: Укажите дашборд (id или slug), окружение и провайдер LLM. "
) ,
" run_llm_documentation " : (
" Нужно уточнение для генерации документации: Укажите dataset_id, окружение и провайдер LLM. "
) ,
" create_branch " : " Нужно уточнение: укажите dashboard_id и имя ветки. " ,
" commit_changes " : " Нужно уточнение: укажите dashboard_id для коммита. " ,
" deploy_dashboard " : " Нужно уточнение: укажите dashboard_id и целевое окружение. " ,
" execute_migration " : " Нужно уточнение: укажите dashboard_id, source_env и target_env. " ,
" run_backup " : " Нужно уточнение: укажите окружение для бэкапа. " ,
}
return guidance_by_operation . get ( operation , detail_text )
# [/DEF:_clarification_text_for_intent:Function]
# [DEF:_plan_intent_with_llm:Function]
# @PURPOSE: Use active LLM provider to select best tool/operation from dynamic catalog.
# @PRE: tools list contains allowed operations for current user.
# @POST: Returns normalized intent dict when planning succeeds; otherwise None.
async def _plan_intent_with_llm (
message : str ,
tools : List [ Dict [ str , Any ] ] ,
db : Session ,
config_manager : ConfigManager ,
) - > Optional [ Dict [ str , Any ] ] :
if not tools :
return None
llm_service = LLMProviderService ( db )
providers = llm_service . get_all_providers ( )
provider = next ( ( p for p in providers if p . is_active ) , None )
if not provider :
return None
api_key = llm_service . get_decrypted_api_key ( provider . id )
if not api_key :
return None
planner = LLMClient (
provider_type = LLMProviderType ( provider . provider_type ) ,
api_key = api_key ,
base_url = provider . base_url ,
default_model = provider . default_model ,
)
system_instruction = (
" You are a deterministic intent planner for backend tools. \n "
" Choose exactly one operation from available_tools or return clarify. \n "
" Output strict JSON object: \n "
" { "
" \" domain \" : string, "
" \" operation \" : string, "
" \" entities \" : object, "
" \" confidence \" : number, "
" \" risk_level \" : \" safe \" | \" guarded \" | \" dangerous \" , "
" \" requires_confirmation \" : boolean "
" } \n "
" Rules: \n "
" - Use only operation names from available_tools. \n "
" - If input is ambiguous, operation must be \" clarify \" with low confidence. \n "
" - Keep entities minimal and factual. \n "
)
payload = {
" available_tools " : tools ,
" user_message " : message ,
" known_environments " : [ { " id " : e . id , " name " : e . name } for e in config_manager . get_environments ( ) ] ,
}
try :
response = await planner . get_json_completion (
[
{ " role " : " system " , " content " : system_instruction } ,
{ " role " : " user " , " content " : json . dumps ( payload , ensure_ascii = False ) } ,
]
)
except Exception as exc :
logger . warning ( f " [assistant.planner][fallback] LLM planner unavailable: { exc } " )
return None
if not isinstance ( response , dict ) :
return None
operation = response . get ( " operation " )
valid_ops = { tool [ " operation " ] for tool in tools }
if operation == " clarify " :
return {
" domain " : " unknown " ,
" operation " : " clarify " ,
" entities " : { } ,
" confidence " : float ( response . get ( " confidence " , 0.3 ) ) ,
" risk_level " : " safe " ,
" requires_confirmation " : False ,
}
if operation not in valid_ops :
return None
by_operation = { tool [ " operation " ] : tool for tool in tools }
selected = by_operation [ operation ]
intent = {
" domain " : response . get ( " domain " ) or selected [ " domain " ] ,
" operation " : operation ,
" entities " : response . get ( " entities " , { } ) ,
" confidence " : float ( response . get ( " confidence " , 0.75 ) ) ,
" risk_level " : response . get ( " risk_level " ) or selected [ " risk_level " ] ,
" requires_confirmation " : bool ( response . get ( " requires_confirmation " , selected [ " requires_confirmation " ] ) ) ,
}
intent = _coerce_intent_entities ( intent )
defaults = selected . get ( " defaults " ) or { }
for key , value in defaults . items ( ) :
if value and not intent [ " entities " ] . get ( key ) :
intent [ " entities " ] [ key ] = value
if operation in { " deploy_dashboard " , " execute_migration " } :
env_token = intent [ " entities " ] . get ( " environment " ) or intent [ " entities " ] . get ( " target_env " )
if _is_production_env ( env_token , config_manager ) :
intent [ " risk_level " ] = " dangerous "
intent [ " requires_confirmation " ] = True
return intent
# [/DEF:_plan_intent_with_llm:Function]
# [DEF:_authorize_intent:Function]
# @PURPOSE: Validate user permissions for parsed intent before confirmation/dispatch.
# @PRE: intent.operation is present for known assistant command domains.
# @POST: Returns if authorized; raises HTTPException(403) when denied.
def _authorize_intent ( intent : Dict [ str , Any ] , current_user : User ) :
operation = intent . get ( " operation " )
checks_map : Dict [ str , List [ Tuple [ str , str ] ] ] = {
" get_task_status " : [ ( " tasks " , " READ " ) ] ,
" create_branch " : [ ( " plugin:git " , " EXECUTE " ) ] ,
" commit_changes " : [ ( " plugin:git " , " EXECUTE " ) ] ,
" deploy_dashboard " : [ ( " plugin:git " , " EXECUTE " ) ] ,
" execute_migration " : [ ( " plugin:migration " , " EXECUTE " ) , ( " plugin:superset-migration " , " EXECUTE " ) ] ,
" run_backup " : [ ( " plugin:superset-backup " , " EXECUTE " ) , ( " plugin:backup " , " EXECUTE " ) ] ,
" run_llm_validation " : [ ( " plugin:llm_dashboard_validation " , " EXECUTE " ) ] ,
" run_llm_documentation " : [ ( " plugin:llm_documentation " , " EXECUTE " ) ] ,
}
if operation in checks_map :
_check_any_permission ( current_user , checks_map [ operation ] )
if operation in INTENT_PERMISSION_CHECKS :
_check_any_permission ( current_user , INTENT_PERMISSION_CHECKS [ operation ] )
# [/DEF:_authorize_intent:Function]
@@ -708,11 +1039,20 @@ async def _dispatch_intent(
if operation == " run_llm_validation " :
_check_any_permission ( current_user , [ ( " plugin:llm_dashboard_validation " , " EXECUTE " ) ] )
env_id = _resolve_env_id ( entities . get ( " environment " ) , config_manager ) or _get_default_environment_id ( config_manager )
dashboard_id = entities . get ( " dashboard_id " )
env_id = _resolve_env_id ( entities . get ( " environment " ) , config_manager )
if not dashboard_id :
dashboard_id = _resolve_dashboard_id_by_ref (
entities . get ( " dashboard_ref " ) ,
env_id ,
config_manager ,
)
provider_id = _resolve_provider_id ( entities . get ( " provider " ) , db )
if not dashboard_id or not env_id or not provider_id :
raise HTTPException ( status_code = 400 , detail = " Missing dashboard_id/environment/provider " )
raise HTTPException (
status_code = 422 ,
detail = " Missing dashboard_id/environment/provider. Укажите ID/slug дашборда или окружение. " ,
)
task = await task_manager . create_task (
plugin_id = " llm_dashboard_validation " ,
@@ -782,7 +1122,14 @@ async def send_message(
_append_history ( user_id , conversation_id , " user " , request . message )
_persist_message ( db , user_id , conversation_id , " user " , request . message )
intent = _parse_command ( request . message , config_manager )
tools_catalog = _build_tool_catalog ( current_user , config_manager , db )
intent = None
try :
intent = await _plan_intent_with_llm ( request . message , tools_catalog , db , config_manager )
except Exception as exc :
logger . warning ( f " [assistant.planner][fallback] Planner error: { exc } " )
if not intent :
intent = _parse_command ( request . message , config_manager )
confidence = float ( intent . get ( " confidence " , 0.0 ) )
if intent . get ( " domain " ) == " unknown " or confidence < 0.6 :
@@ -886,8 +1233,18 @@ async def send_message(
created_at = datetime . utcnow ( ) ,
)
except HTTPException as exc :
state = " denied " if exc . status_code == status . HTTP_403_FORBIDDEN else " failed "
text = str ( exc . detail )
detail_text = str ( exc . detail )
is_clarification_error = exc . status_code in ( 400 , 422 ) and (
detail_text . lower ( ) . startswith ( " missing " )
or " укажите " in detail_text . lower ( )
)
if exc . status_code == status . HTTP_403_FORBIDDEN :
state = " denied "
elif is_clarification_error :
state = " needs_clarification "
else :
state = " failed "
text = _clarification_text_for_intent ( intent , detail_text ) if state == " needs_clarification " else detail_text
_append_history ( user_id , conversation_id , " assistant " , text , state = state )
_persist_message ( db , user_id , conversation_id , " assistant " , text , state = state , metadata = { " intent " : intent } )
audit_payload = { " decision " : state , " message " : request . message , " intent " : intent , " error " : text }
@@ -899,7 +1256,7 @@ async def send_message(
state = state ,
text = text ,
intent = intent ,
actions = [ ] ,
actions = [ AssistantAction ( type = " rephrase " , label = " Rephrase command " ) ] if state == " needs_clarification " else [ ] ,
created_at = datetime . utcnow ( ) ,
)
# [/DEF:send_message:Function]