Files
ss-tools/backend/src/services/llm_provider.py
2026-02-10 12:53:01 +03:00

169 lines
7.2 KiB
Python

# [DEF:backend.src.services.llm_provider:Module]
# @TIER: STANDARD
# @SEMANTICS: service, llm, provider, encryption
# @PURPOSE: Service for managing LLM provider configurations with encrypted API keys.
# @LAYER: Domain
# @RELATION: DEPENDS_ON -> backend.src.core.database
# @RELATION: DEPENDS_ON -> backend.src.models.llm
from typing import List, Optional
from sqlalchemy.orm import Session
from ..models.llm import LLMProvider
from ..plugins.llm_analysis.models import LLMProviderConfig
from ..core.logger import belief_scope, logger
from cryptography.fernet import Fernet
import os
# [DEF:EncryptionManager:Class]
# @TIER: CRITICAL
# @PURPOSE: Handles encryption and decryption of sensitive data like API keys.
# @INVARIANT: Uses a secret key from environment or a default one (fallback only for dev).
class EncryptionManager:
# [DEF:EncryptionManager.__init__:Function]
# @PURPOSE: Initialize the encryption manager with a Fernet key.
# @PRE: ENCRYPTION_KEY env var must be set or use default dev key.
# @POST: Fernet instance ready for encryption/decryption.
def __init__(self):
self.key = os.getenv("ENCRYPTION_KEY", "ZcytYzi0iHIl4Ttr-GdAEk117aGRogkGvN3wiTxrPpE=").encode()
self.fernet = Fernet(self.key)
# [/DEF:EncryptionManager.__init__:Function]
# [DEF:EncryptionManager.encrypt:Function]
# @PURPOSE: Encrypt a plaintext string.
# @PRE: data must be a non-empty string.
# @POST: Returns encrypted string.
def encrypt(self, data: str) -> str:
return self.fernet.encrypt(data.encode()).decode()
# [/DEF:EncryptionManager.encrypt:Function]
# [DEF:EncryptionManager.decrypt:Function]
# @PURPOSE: Decrypt an encrypted string.
# @PRE: encrypted_data must be a valid Fernet-encrypted string.
# @POST: Returns original plaintext string.
def decrypt(self, encrypted_data: str) -> str:
return self.fernet.decrypt(encrypted_data.encode()).decode()
# [/DEF:EncryptionManager.decrypt:Function]
# [/DEF:EncryptionManager:Class]
# [DEF:LLMProviderService:Class]
# @TIER: STANDARD
# @PURPOSE: Service to manage LLM provider lifecycle.
class LLMProviderService:
# [DEF:LLMProviderService.__init__:Function]
# @PURPOSE: Initialize the service with database session.
# @PRE: db must be a valid SQLAlchemy Session.
# @POST: Service ready for provider operations.
def __init__(self, db: Session):
self.db = db
self.encryption = EncryptionManager()
# [/DEF:LLMProviderService.__init__:Function]
# [DEF:get_all_providers:Function]
# @TIER: STANDARD
# @PURPOSE: Returns all configured LLM providers.
# @PRE: Database connection must be active.
# @POST: Returns list of all LLMProvider records.
def get_all_providers(self) -> List[LLMProvider]:
with belief_scope("get_all_providers"):
return self.db.query(LLMProvider).all()
# [/DEF:get_all_providers:Function]
# [DEF:get_provider:Function]
# @TIER: STANDARD
# @PURPOSE: Returns a single LLM provider by ID.
# @PRE: provider_id must be a valid string.
# @POST: Returns LLMProvider or None if not found.
def get_provider(self, provider_id: str) -> Optional[LLMProvider]:
with belief_scope("get_provider"):
return self.db.query(LLMProvider).filter(LLMProvider.id == provider_id).first()
# [/DEF:get_provider:Function]
# [DEF:create_provider:Function]
# @TIER: STANDARD
# @PURPOSE: Creates a new LLM provider with encrypted API key.
# @PRE: config must contain valid provider configuration.
# @POST: New provider created and persisted to database.
def create_provider(self, config: LLMProviderConfig) -> LLMProvider:
with belief_scope("create_provider"):
encrypted_key = self.encryption.encrypt(config.api_key)
db_provider = LLMProvider(
provider_type=config.provider_type.value,
name=config.name,
base_url=config.base_url,
api_key=encrypted_key,
default_model=config.default_model,
is_active=config.is_active
)
self.db.add(db_provider)
self.db.commit()
self.db.refresh(db_provider)
return db_provider
# [/DEF:create_provider:Function]
# [DEF:update_provider:Function]
# @TIER: STANDARD
# @PURPOSE: Updates an existing LLM provider.
# @PRE: provider_id must exist, config must be valid.
# @POST: Provider updated and persisted to database.
def update_provider(self, provider_id: str, config: LLMProviderConfig) -> Optional[LLMProvider]:
with belief_scope("update_provider"):
db_provider = self.get_provider(provider_id)
if not db_provider:
return None
db_provider.provider_type = config.provider_type.value
db_provider.name = config.name
db_provider.base_url = config.base_url
# Only update API key if provided (not None and not empty)
if config.api_key is not None and config.api_key != "":
db_provider.api_key = self.encryption.encrypt(config.api_key)
db_provider.default_model = config.default_model
db_provider.is_active = config.is_active
self.db.commit()
self.db.refresh(db_provider)
return db_provider
# [/DEF:update_provider:Function]
# [DEF:delete_provider:Function]
# @TIER: STANDARD
# @PURPOSE: Deletes an LLM provider.
# @PRE: provider_id must exist.
# @POST: Provider removed from database.
def delete_provider(self, provider_id: str) -> bool:
with belief_scope("delete_provider"):
db_provider = self.get_provider(provider_id)
if not db_provider:
return False
self.db.delete(db_provider)
self.db.commit()
return True
# [/DEF:delete_provider:Function]
# [DEF:get_decrypted_api_key:Function]
# @TIER: STANDARD
# @PURPOSE: Returns the decrypted API key for a provider.
# @PRE: provider_id must exist with valid encrypted key.
# @POST: Returns decrypted API key or None on failure.
def get_decrypted_api_key(self, provider_id: str) -> Optional[str]:
with belief_scope("get_decrypted_api_key"):
db_provider = self.get_provider(provider_id)
if not db_provider:
logger.warning(f"[get_decrypted_api_key] Provider {provider_id} not found in database")
return None
logger.info(f"[get_decrypted_api_key] Provider found: {db_provider.id}")
logger.info(f"[get_decrypted_api_key] Encrypted API key length: {len(db_provider.api_key) if db_provider.api_key else 0}")
try:
decrypted_key = self.encryption.decrypt(db_provider.api_key)
logger.info(f"[get_decrypted_api_key] Decryption successful, key length: {len(decrypted_key) if decrypted_key else 0}")
return decrypted_key
except Exception as e:
logger.error(f"[get_decrypted_api_key] Decryption failed: {str(e)}")
return None
# [/DEF:get_decrypted_api_key:Function]
# [/DEF:LLMProviderService:Class]
# [/DEF:backend.src.services.llm_provider:Module]