Files
ss-tools/backend/src/core/superset_client.py
2026-02-15 11:11:30 +03:00

551 lines
28 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# [DEF:backend.src.core.superset_client:Module]
#
# @SEMANTICS: superset, api, client, rest, http, dashboard, dataset, import, export
# @PURPOSE: Предоставляет высокоуровневый клиент для взаимодействия с Superset REST API, инкапсулируя логику запросов, обработку ошибок и пагинацию.
# @LAYER: Core
# @RELATION: USES -> backend.src.core.utils.network.APIClient
# @RELATION: USES -> backend.src.core.config_models.Environment
#
# @INVARIANT: All network operations must use the internal APIClient instance.
# @PUBLIC_API: SupersetClient
# [SECTION: IMPORTS]
import json
import zipfile
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Union, cast
from requests import Response
from .logger import logger as app_logger, belief_scope
from .utils.network import APIClient, SupersetAPIError
from .utils.fileio import get_filename_from_headers
from .config_models import Environment
# [/SECTION]
# [DEF:SupersetClient:Class]
# @PURPOSE: Класс-обёртка над Superset REST API, предоставляющий методы для работы с дашбордами и датасетами.
class SupersetClient:
# [DEF:__init__:Function]
# @PURPOSE: Инициализирует клиент, проверяет конфигурацию и создает сетевой клиент.
# @PRE: `env` должен быть валидным объектом Environment.
# @POST: Атрибуты `env` и `network` созданы и готовы к работе.
# @PARAM: env (Environment) - Конфигурация окружения.
def __init__(self, env: Environment):
with belief_scope("__init__"):
app_logger.info("[SupersetClient.__init__][Enter] Initializing SupersetClient for env %s.", env.name)
self.env = env
# Construct auth payload expected by Superset API
auth_payload = {
"username": env.username,
"password": env.password,
"provider": "db",
"refresh": "true"
}
self.network = APIClient(
config={
"base_url": env.url,
"auth": auth_payload
},
verify_ssl=env.verify_ssl,
timeout=env.timeout
)
self.delete_before_reimport: bool = False
app_logger.info("[SupersetClient.__init__][Exit] SupersetClient initialized.")
# [/DEF:__init__:Function]
# [DEF:authenticate:Function]
# @PURPOSE: Authenticates the client using the configured credentials.
# @PRE: self.network must be initialized with valid auth configuration.
# @POST: Client is authenticated and tokens are stored.
# @RETURN: Dict[str, str] - Authentication tokens.
def authenticate(self) -> Dict[str, str]:
with belief_scope("SupersetClient.authenticate"):
return self.network.authenticate()
# [/DEF:authenticate:Function]
@property
# [DEF:headers:Function]
# @PURPOSE: Возвращает базовые HTTP-заголовки, используемые сетевым клиентом.
# @PRE: APIClient is initialized and authenticated.
# @POST: Returns a dictionary of HTTP headers.
def headers(self) -> dict:
with belief_scope("headers"):
return self.network.headers
# [/DEF:headers:Function]
# [SECTION: DASHBOARD OPERATIONS]
# [DEF:get_dashboards:Function]
# @PURPOSE: Получает полный список дашбордов, автоматически обрабатывая пагинацию.
# @PARAM: query (Optional[Dict]) - Дополнительные параметры запроса для API.
# @PRE: Client is authenticated.
# @POST: Returns a tuple with total count and list of dashboards.
# @RETURN: Tuple[int, List[Dict]] - Кортеж (общее количество, список дашбордов).
def get_dashboards(self, query: Optional[Dict] = None) -> Tuple[int, List[Dict]]:
with belief_scope("get_dashboards"):
app_logger.info("[get_dashboards][Enter] Fetching dashboards.")
validated_query = self._validate_query_params(query or {})
if 'columns' not in validated_query:
validated_query['columns'] = ["slug", "id", "changed_on_utc", "dashboard_title", "published"]
total_count = self._fetch_total_object_count(endpoint="/dashboard/")
paginated_data = self._fetch_all_pages(
endpoint="/dashboard/",
pagination_options={"base_query": validated_query, "total_count": total_count, "results_field": "result"},
)
app_logger.info("[get_dashboards][Exit] Found %d dashboards.", total_count)
return total_count, paginated_data
# [/DEF:get_dashboards:Function]
# [DEF:get_dashboards_summary:Function]
# @PURPOSE: Fetches dashboard metadata optimized for the grid.
# @PRE: Client is authenticated.
# @POST: Returns a list of dashboard metadata summaries.
# @RETURN: List[Dict]
def get_dashboards_summary(self) -> List[Dict]:
with belief_scope("SupersetClient.get_dashboards_summary"):
query = {
"columns": ["id", "dashboard_title", "changed_on_utc", "published"]
}
_, dashboards = self.get_dashboards(query=query)
# Map fields to DashboardMetadata schema
result = []
for dash in dashboards:
result.append({
"id": dash.get("id"),
"title": dash.get("dashboard_title"),
"last_modified": dash.get("changed_on_utc"),
"status": "published" if dash.get("published") else "draft"
})
return result
# [/DEF:get_dashboards_summary:Function]
# [DEF:export_dashboard:Function]
# @PURPOSE: Экспортирует дашборд в виде ZIP-архива.
# @PARAM: dashboard_id (int) - ID дашборда для экспорта.
# @PRE: dashboard_id must exist in Superset.
# @POST: Returns ZIP content and filename.
# @RETURN: Tuple[bytes, str] - Бинарное содержимое ZIP-архива и имя файла.
def export_dashboard(self, dashboard_id: int) -> Tuple[bytes, str]:
with belief_scope("export_dashboard"):
app_logger.info("[export_dashboard][Enter] Exporting dashboard %s.", dashboard_id)
response = self.network.request(
method="GET",
endpoint="/dashboard/export/",
params={"q": json.dumps([dashboard_id])},
stream=True,
raw_response=True,
)
response = cast(Response, response)
self._validate_export_response(response, dashboard_id)
filename = self._resolve_export_filename(response, dashboard_id)
app_logger.info("[export_dashboard][Exit] Exported dashboard %s to %s.", dashboard_id, filename)
return response.content, filename
# [/DEF:export_dashboard:Function]
# [DEF:import_dashboard:Function]
# @PURPOSE: Импортирует дашборд из ZIP-файла.
# @PARAM: file_name (Union[str, Path]) - Путь к ZIP-архиву.
# @PARAM: dash_id (Optional[int]) - ID дашборда для удаления при сбое.
# @PARAM: dash_slug (Optional[str]) - Slug дашборда для поиска ID.
# @PRE: file_name must be a valid ZIP dashboard export.
# @POST: Dashboard is imported or re-imported after deletion.
# @RETURN: Dict - Ответ API в случае успеха.
def import_dashboard(self, file_name: Union[str, Path], dash_id: Optional[int] = None, dash_slug: Optional[str] = None) -> Dict:
with belief_scope("import_dashboard"):
file_path = str(file_name)
self._validate_import_file(file_path)
try:
return self._do_import(file_path)
except Exception as exc:
app_logger.error("[import_dashboard][Failure] First import attempt failed: %s", exc, exc_info=True)
if not self.delete_before_reimport:
raise
target_id = self._resolve_target_id_for_delete(dash_id, dash_slug)
if target_id is None:
app_logger.error("[import_dashboard][Failure] No ID available for delete-retry.")
raise
self.delete_dashboard(target_id)
app_logger.info("[import_dashboard][State] Deleted dashboard ID %s, retrying import.", target_id)
return self._do_import(file_path)
# [/DEF:import_dashboard:Function]
# [DEF:delete_dashboard:Function]
# @PURPOSE: Удаляет дашборд по его ID или slug.
# @PARAM: dashboard_id (Union[int, str]) - ID или slug дашборда.
# @PRE: dashboard_id must exist.
# @POST: Dashboard is removed from Superset.
def delete_dashboard(self, dashboard_id: Union[int, str]) -> None:
with belief_scope("delete_dashboard"):
app_logger.info("[delete_dashboard][Enter] Deleting dashboard %s.", dashboard_id)
response = self.network.request(method="DELETE", endpoint=f"/dashboard/{dashboard_id}")
response = cast(Dict, response)
if response.get("result", True) is not False:
app_logger.info("[delete_dashboard][Success] Dashboard %s deleted.", dashboard_id)
else:
app_logger.warning("[delete_dashboard][Warning] Unexpected response while deleting %s: %s", dashboard_id, response)
# [/DEF:delete_dashboard:Function]
# [/SECTION]
# [SECTION: DATASET OPERATIONS]
# [DEF:get_datasets:Function]
# @PURPOSE: Получает полный список датасетов, автоматически обрабатывая пагинацию.
# @PARAM: query (Optional[Dict]) - Дополнительные параметры запроса.
# @PRE: Client is authenticated.
# @POST: Returns total count and list of datasets.
# @RETURN: Tuple[int, List[Dict]] - Кортеж (общее количество, список датасетов).
def get_datasets(self, query: Optional[Dict] = None) -> Tuple[int, List[Dict]]:
with belief_scope("get_datasets"):
app_logger.info("[get_datasets][Enter] Fetching datasets.")
validated_query = self._validate_query_params(query)
total_count = self._fetch_total_object_count(endpoint="/dataset/")
paginated_data = self._fetch_all_pages(
endpoint="/dataset/",
pagination_options={"base_query": validated_query, "total_count": total_count, "results_field": "result"},
)
app_logger.info("[get_datasets][Exit] Found %d datasets.", total_count)
return total_count, paginated_data
# [/DEF:get_datasets:Function]
# [DEF:get_datasets_summary:Function]
# @PURPOSE: Fetches dataset metadata optimized for the Dataset Hub grid.
# @PRE: Client is authenticated.
# @POST: Returns a list of dataset metadata summaries.
# @RETURN: List[Dict]
def get_datasets_summary(self) -> List[Dict]:
with belief_scope("SupersetClient.get_datasets_summary"):
query = {
"columns": ["id", "table_name", "schema", "database"]
}
_, datasets = self.get_datasets(query=query)
# Map fields to match the contracts
result = []
for ds in datasets:
result.append({
"id": ds.get("id"),
"table_name": ds.get("table_name"),
"schema": ds.get("schema"),
"database": ds.get("database", {}).get("database_name", "Unknown")
})
return result
# [/DEF:get_datasets_summary:Function]
# [DEF:get_dataset_detail:Function]
# @PURPOSE: Fetches detailed dataset information including columns and linked dashboards
# @PRE: Client is authenticated and dataset_id exists.
# @POST: Returns detailed dataset info with columns and linked dashboards.
# @PARAM: dataset_id (int) - The dataset ID to fetch details for.
# @RETURN: Dict - Dataset details with columns and linked_dashboards.
# @RELATION: CALLS -> self.get_dataset
# @RELATION: CALLS -> self.network.request (for related_objects)
def get_dataset_detail(self, dataset_id: int) -> Dict:
with belief_scope("SupersetClient.get_dataset_detail", f"id={dataset_id}"):
# Get base dataset info
dataset = self.get_dataset(dataset_id)
# Extract columns information
columns = dataset.get("columns", [])
column_info = []
for col in columns:
column_info.append({
"id": col.get("id"),
"name": col.get("column_name"),
"type": col.get("type"),
"is_dttm": col.get("is_dttm", False),
"is_active": col.get("is_active", True),
"description": col.get("description", "")
})
# Get linked dashboards using related_objects endpoint
linked_dashboards = []
try:
related_objects = self.network.request(
method="GET",
endpoint=f"/dataset/{dataset_id}/related_objects"
)
# Handle different response formats
if isinstance(related_objects, dict):
if "dashboards" in related_objects:
dashboards_data = related_objects["dashboards"]
elif "result" in related_objects and isinstance(related_objects["result"], dict):
dashboards_data = related_objects["result"].get("dashboards", [])
else:
dashboards_data = []
for dash in dashboards_data:
linked_dashboards.append({
"id": dash.get("id"),
"title": dash.get("dashboard_title") or dash.get("title", "Unknown"),
"slug": dash.get("slug")
})
except Exception as e:
app_logger.warning(f"[get_dataset_detail][Warning] Failed to fetch related dashboards: {e}")
linked_dashboards = []
# Extract SQL table information
sql = dataset.get("sql", "")
result = {
"id": dataset.get("id"),
"table_name": dataset.get("table_name"),
"schema": dataset.get("schema"),
"database": dataset.get("database", {}).get("database_name", "Unknown"),
"description": dataset.get("description", ""),
"columns": column_info,
"column_count": len(column_info),
"sql": sql,
"linked_dashboards": linked_dashboards,
"linked_dashboard_count": len(linked_dashboards),
"is_sqllab_view": dataset.get("is_sqllab_view", False),
"created_on": dataset.get("created_on"),
"changed_on": dataset.get("changed_on")
}
app_logger.info(f"[get_dataset_detail][Exit] Got dataset {dataset_id} with {len(column_info)} columns and {len(linked_dashboards)} linked dashboards")
return result
# [/DEF:get_dataset_detail:Function]
# [DEF:get_dataset:Function]
# @PURPOSE: Получает информацию о конкретном датасете по его ID.
# @PARAM: dataset_id (int) - ID датасета.
# @PRE: dataset_id must exist.
# @POST: Returns dataset details.
# @RETURN: Dict - Информация о датасете.
def get_dataset(self, dataset_id: int) -> Dict:
with belief_scope("SupersetClient.get_dataset", f"id={dataset_id}"):
app_logger.info("[get_dataset][Enter] Fetching dataset %s.", dataset_id)
response = self.network.request(method="GET", endpoint=f"/dataset/{dataset_id}")
response = cast(Dict, response)
app_logger.info("[get_dataset][Exit] Got dataset %s.", dataset_id)
return response
# [/DEF:get_dataset:Function]
# [DEF:update_dataset:Function]
# @PURPOSE: Обновляет данные датасета по его ID.
# @PARAM: dataset_id (int) - ID датасета.
# @PARAM: data (Dict) - Данные для обновления.
# @PRE: dataset_id must exist.
# @POST: Dataset is updated in Superset.
# @RETURN: Dict - Ответ API.
def update_dataset(self, dataset_id: int, data: Dict) -> Dict:
with belief_scope("SupersetClient.update_dataset", f"id={dataset_id}"):
app_logger.info("[update_dataset][Enter] Updating dataset %s.", dataset_id)
response = self.network.request(
method="PUT",
endpoint=f"/dataset/{dataset_id}",
data=json.dumps(data),
headers={'Content-Type': 'application/json'}
)
response = cast(Dict, response)
app_logger.info("[update_dataset][Exit] Updated dataset %s.", dataset_id)
return response
# [/DEF:update_dataset:Function]
# [/SECTION]
# [SECTION: DATABASE OPERATIONS]
# [DEF:get_databases:Function]
# @PURPOSE: Получает полный список баз данных.
# @PARAM: query (Optional[Dict]) - Дополнительные параметры запроса.
# @PRE: Client is authenticated.
# @POST: Returns total count and list of databases.
# @RETURN: Tuple[int, List[Dict]] - Кортеж (общее количество, список баз данных).
def get_databases(self, query: Optional[Dict] = None) -> Tuple[int, List[Dict]]:
with belief_scope("get_databases"):
app_logger.info("[get_databases][Enter] Fetching databases.")
validated_query = self._validate_query_params(query or {})
if 'columns' not in validated_query:
validated_query['columns'] = []
total_count = self._fetch_total_object_count(endpoint="/database/")
paginated_data = self._fetch_all_pages(
endpoint="/database/",
pagination_options={"base_query": validated_query, "total_count": total_count, "results_field": "result"},
)
app_logger.info("[get_databases][Exit] Found %d databases.", total_count)
return total_count, paginated_data
# [/DEF:get_databases:Function]
# [DEF:get_database:Function]
# @PURPOSE: Получает информацию о конкретной базе данных по её ID.
# @PARAM: database_id (int) - ID базы данных.
# @PRE: database_id must exist.
# @POST: Returns database details.
# @RETURN: Dict - Информация о базе данных.
def get_database(self, database_id: int) -> Dict:
with belief_scope("get_database"):
app_logger.info("[get_database][Enter] Fetching database %s.", database_id)
response = self.network.request(method="GET", endpoint=f"/database/{database_id}")
response = cast(Dict, response)
app_logger.info("[get_database][Exit] Got database %s.", database_id)
return response
# [/DEF:get_database:Function]
# [DEF:get_databases_summary:Function]
# @PURPOSE: Fetch a summary of databases including uuid, name, and engine.
# @PRE: Client is authenticated.
# @POST: Returns list of database summaries.
# @RETURN: List[Dict] - Summary of databases.
def get_databases_summary(self) -> List[Dict]:
with belief_scope("SupersetClient.get_databases_summary"):
query = {
"columns": ["uuid", "database_name", "backend"]
}
_, databases = self.get_databases(query=query)
# Map 'backend' to 'engine' for consistency with contracts
for db in databases:
db['engine'] = db.pop('backend', None)
return databases
# [/DEF:get_databases_summary:Function]
# [DEF:get_database_by_uuid:Function]
# @PURPOSE: Find a database by its UUID.
# @PARAM: db_uuid (str) - The UUID of the database.
# @PRE: db_uuid must be a valid UUID string.
# @POST: Returns database info or None.
# @RETURN: Optional[Dict] - Database info if found, else None.
def get_database_by_uuid(self, db_uuid: str) -> Optional[Dict]:
with belief_scope("SupersetClient.get_database_by_uuid", f"uuid={db_uuid}"):
query = {
"filters": [{"col": "uuid", "op": "eq", "value": db_uuid}]
}
_, databases = self.get_databases(query=query)
return databases[0] if databases else None
# [/DEF:get_database_by_uuid:Function]
# [/SECTION]
# [SECTION: HELPERS]
# [DEF:_resolve_target_id_for_delete:Function]
# @PURPOSE: Resolves a dashboard ID from either an ID or a slug.
# @PRE: Either dash_id or dash_slug should be provided.
# @POST: Returns the resolved ID or None.
def _resolve_target_id_for_delete(self, dash_id: Optional[int], dash_slug: Optional[str]) -> Optional[int]:
with belief_scope("_resolve_target_id_for_delete"):
if dash_id is not None:
return dash_id
if dash_slug is not None:
app_logger.debug("[_resolve_target_id_for_delete][State] Resolving ID by slug '%s'.", dash_slug)
try:
_, candidates = self.get_dashboards(query={"filters": [{"col": "slug", "op": "eq", "value": dash_slug}]})
if candidates:
target_id = candidates[0]["id"]
app_logger.debug("[_resolve_target_id_for_delete][Success] Resolved slug to ID %s.", target_id)
return target_id
except Exception as e:
app_logger.warning("[_resolve_target_id_for_delete][Warning] Could not resolve slug '%s' to ID: %s", dash_slug, e)
return None
# [/DEF:_resolve_target_id_for_delete:Function]
# [DEF:_do_import:Function]
# @PURPOSE: Performs the actual multipart upload for import.
# @PRE: file_name must be a path to an existing ZIP file.
# @POST: Returns the API response from the upload.
def _do_import(self, file_name: Union[str, Path]) -> Dict:
with belief_scope("_do_import"):
app_logger.debug(f"[_do_import][State] Uploading file: {file_name}")
file_path = Path(file_name)
if not file_path.exists():
app_logger.error(f"[_do_import][Failure] File does not exist: {file_name}")
raise FileNotFoundError(f"File does not exist: {file_name}")
return self.network.upload_file(
endpoint="/dashboard/import/",
file_info={"file_obj": file_path, "file_name": file_path.name, "form_field": "formData"},
extra_data={"overwrite": "true"},
timeout=self.env.timeout * 2,
)
# [/DEF:_do_import:Function]
# [DEF:_validate_export_response:Function]
# @PURPOSE: Validates that the export response is a non-empty ZIP archive.
# @PRE: response must be a valid requests.Response object.
# @POST: Raises SupersetAPIError if validation fails.
def _validate_export_response(self, response: Response, dashboard_id: int) -> None:
with belief_scope("_validate_export_response"):
content_type = response.headers.get("Content-Type", "")
if "application/zip" not in content_type:
raise SupersetAPIError(f"Получен не ZIP-архив (Content-Type: {content_type})")
if not response.content:
raise SupersetAPIError("Получены пустые данные при экспорте")
# [/DEF:_validate_export_response:Function]
# [DEF:_resolve_export_filename:Function]
# @PURPOSE: Determines the filename for an exported dashboard.
# @PRE: response must contain Content-Disposition header or dashboard_id must be provided.
# @POST: Returns a sanitized filename string.
def _resolve_export_filename(self, response: Response, dashboard_id: int) -> str:
with belief_scope("_resolve_export_filename"):
filename = get_filename_from_headers(dict(response.headers))
if not filename:
from datetime import datetime
timestamp = datetime.now().strftime("%Y%m%dT%H%M%S")
filename = f"dashboard_export_{dashboard_id}_{timestamp}.zip"
app_logger.warning("[_resolve_export_filename][Warning] Generated filename: %s", filename)
return filename
# [/DEF:_resolve_export_filename:Function]
# [DEF:_validate_query_params:Function]
# @PURPOSE: Ensures query parameters have default page and page_size.
# @PRE: query can be None or a dictionary.
# @POST: Returns a dictionary with at least page and page_size.
def _validate_query_params(self, query: Optional[Dict]) -> Dict:
with belief_scope("_validate_query_params"):
base_query = {"page": 0, "page_size": 1000}
return {**base_query, **(query or {})}
# [/DEF:_validate_query_params:Function]
# [DEF:_fetch_total_object_count:Function]
# @PURPOSE: Fetches the total number of items for a given endpoint.
# @PRE: endpoint must be a valid Superset API path.
# @POST: Returns the total count as an integer.
def _fetch_total_object_count(self, endpoint: str) -> int:
with belief_scope("_fetch_total_object_count"):
return self.network.fetch_paginated_count(
endpoint=endpoint,
query_params={"page": 0, "page_size": 1},
count_field="count",
)
# [/DEF:_fetch_total_object_count:Function]
# [DEF:_fetch_all_pages:Function]
# @PURPOSE: Iterates through all pages to collect all data items.
# @PRE: pagination_options must contain base_query, total_count, and results_field.
# @POST: Returns a combined list of all items.
def _fetch_all_pages(self, endpoint: str, pagination_options: Dict) -> List[Dict]:
with belief_scope("_fetch_all_pages"):
return self.network.fetch_paginated_data(endpoint=endpoint, pagination_options=pagination_options)
# [/DEF:_fetch_all_pages:Function]
# [DEF:_validate_import_file:Function]
# @PURPOSE: Validates that the file to be imported is a valid ZIP with metadata.yaml.
# @PRE: zip_path must be a path to a file.
# @POST: Raises error if file is missing, not a ZIP, or missing metadata.
def _validate_import_file(self, zip_path: Union[str, Path]) -> None:
with belief_scope("_validate_import_file"):
path = Path(zip_path)
if not path.exists():
raise FileNotFoundError(f"Файл {zip_path} не существует")
if not zipfile.is_zipfile(path):
raise SupersetAPIError(f"Файл {zip_path} не является ZIP-архивом")
with zipfile.ZipFile(path, "r") as zf:
if not any(n.endswith("metadata.yaml") for n in zf.namelist()):
raise SupersetAPIError(f"Архив {zip_path} не содержит 'metadata.yaml'")
# [/DEF:_validate_import_file:Function]
# [/SECTION]
# [/DEF:SupersetClient:Class]
# [/DEF:backend.src.core.superset_client:Module]