# [DEF:backend.src.core.superset_client:Module] # # @SEMANTICS: superset, api, client, rest, http, dashboard, dataset, import, export # @PURPOSE: Предоставляет высокоуровневый клиент для взаимодействия с Superset REST API, инкапсулируя логику запросов, обработку ошибок и пагинацию. # @LAYER: Core # @RELATION: USES -> backend.src.core.utils.network.APIClient # @RELATION: USES -> backend.src.core.config_models.Environment # # @INVARIANT: All network operations must use the internal APIClient instance. # @PUBLIC_API: SupersetClient # [SECTION: IMPORTS] import json import re import zipfile from pathlib import Path from typing import Dict, List, Optional, Tuple, Union, cast from requests import Response from .logger import logger as app_logger, belief_scope from .utils.network import APIClient, SupersetAPIError from .utils.fileio import get_filename_from_headers from .config_models import Environment # [/SECTION] # [DEF:SupersetClient:Class] # @PURPOSE: Класс-обёртка над Superset REST API, предоставляющий методы для работы с дашбордами и датасетами. class SupersetClient: # [DEF:__init__:Function] # @PURPOSE: Инициализирует клиент, проверяет конфигурацию и создает сетевой клиент. # @PRE: `env` должен быть валидным объектом Environment. # @POST: Атрибуты `env` и `network` созданы и готовы к работе. # @PARAM: env (Environment) - Конфигурация окружения. def __init__(self, env: Environment): with belief_scope("__init__"): app_logger.info("[SupersetClient.__init__][Enter] Initializing SupersetClient for env %s.", env.name) self.env = env # Construct auth payload expected by Superset API auth_payload = { "username": env.username, "password": env.password, "provider": "db", "refresh": "true" } self.network = APIClient( config={ "base_url": env.url, "auth": auth_payload }, verify_ssl=env.verify_ssl, timeout=env.timeout ) self.delete_before_reimport: bool = False app_logger.info("[SupersetClient.__init__][Exit] SupersetClient initialized.") # [/DEF:__init__:Function] # [DEF:authenticate:Function] # @PURPOSE: Authenticates the client using the configured credentials. # @PRE: self.network must be initialized with valid auth configuration. # @POST: Client is authenticated and tokens are stored. # @RETURN: Dict[str, str] - Authentication tokens. def authenticate(self) -> Dict[str, str]: with belief_scope("SupersetClient.authenticate"): return self.network.authenticate() # [/DEF:authenticate:Function] @property # [DEF:headers:Function] # @PURPOSE: Возвращает базовые HTTP-заголовки, используемые сетевым клиентом. # @PRE: APIClient is initialized and authenticated. # @POST: Returns a dictionary of HTTP headers. def headers(self) -> dict: with belief_scope("headers"): return self.network.headers # [/DEF:headers:Function] # [SECTION: DASHBOARD OPERATIONS] # [DEF:get_dashboards:Function] # @PURPOSE: Получает полный список дашбордов, автоматически обрабатывая пагинацию. # @PARAM: query (Optional[Dict]) - Дополнительные параметры запроса для API. # @PRE: Client is authenticated. # @POST: Returns a tuple with total count and list of dashboards. # @RETURN: Tuple[int, List[Dict]] - Кортеж (общее количество, список дашбордов). def get_dashboards(self, query: Optional[Dict] = None) -> Tuple[int, List[Dict]]: with belief_scope("get_dashboards"): app_logger.info("[get_dashboards][Enter] Fetching dashboards.") validated_query = self._validate_query_params(query or {}) if 'columns' not in validated_query: validated_query['columns'] = ["slug", "id", "changed_on_utc", "dashboard_title", "published"] paginated_data = self._fetch_all_pages( endpoint="/dashboard/", pagination_options={"base_query": validated_query, "results_field": "result"}, ) total_count = len(paginated_data) app_logger.info("[get_dashboards][Exit] Found %d dashboards.", total_count) return total_count, paginated_data # [/DEF:get_dashboards:Function] # [DEF:get_dashboards_summary:Function] # @PURPOSE: Fetches dashboard metadata optimized for the grid. # @PRE: Client is authenticated. # @POST: Returns a list of dashboard metadata summaries. # @RETURN: List[Dict] def get_dashboards_summary(self) -> List[Dict]: with belief_scope("SupersetClient.get_dashboards_summary"): query = { "columns": ["id", "dashboard_title", "changed_on_utc", "published"] } _, dashboards = self.get_dashboards(query=query) # Map fields to DashboardMetadata schema result = [] for dash in dashboards: result.append({ "id": dash.get("id"), "title": dash.get("dashboard_title"), "last_modified": dash.get("changed_on_utc"), "status": "published" if dash.get("published") else "draft" }) return result # [/DEF:get_dashboards_summary:Function] # [DEF:get_dashboard:Function] # @PURPOSE: Fetches a single dashboard by ID. # @PRE: Client is authenticated and dashboard_id exists. # @POST: Returns dashboard payload from Superset API. # @RETURN: Dict def get_dashboard(self, dashboard_id: int) -> Dict: with belief_scope("SupersetClient.get_dashboard", f"id={dashboard_id}"): response = self.network.request(method="GET", endpoint=f"/dashboard/{dashboard_id}") return cast(Dict, response) # [/DEF:get_dashboard:Function] # [DEF:get_chart:Function] # @PURPOSE: Fetches a single chart by ID. # @PRE: Client is authenticated and chart_id exists. # @POST: Returns chart payload from Superset API. # @RETURN: Dict def get_chart(self, chart_id: int) -> Dict: with belief_scope("SupersetClient.get_chart", f"id={chart_id}"): response = self.network.request(method="GET", endpoint=f"/chart/{chart_id}") return cast(Dict, response) # [/DEF:get_chart:Function] # [DEF:get_dashboard_detail:Function] # @PURPOSE: Fetches detailed dashboard information including related charts and datasets. # @PRE: Client is authenticated and dashboard_id exists. # @POST: Returns dashboard metadata with charts and datasets lists. # @RETURN: Dict def get_dashboard_detail(self, dashboard_id: int) -> Dict: with belief_scope("SupersetClient.get_dashboard_detail", f"id={dashboard_id}"): dashboard_response = self.get_dashboard(dashboard_id) dashboard_data = dashboard_response.get("result", dashboard_response) charts: List[Dict] = [] datasets: List[Dict] = [] def extract_dataset_id_from_form_data(form_data: Optional[Dict]) -> Optional[int]: if not isinstance(form_data, dict): return None datasource = form_data.get("datasource") if isinstance(datasource, str): matched = re.match(r"^(\d+)__", datasource) if matched: try: return int(matched.group(1)) except ValueError: return None if isinstance(datasource, dict): ds_id = datasource.get("id") try: return int(ds_id) if ds_id is not None else None except (TypeError, ValueError): return None ds_id = form_data.get("datasource_id") try: return int(ds_id) if ds_id is not None else None except (TypeError, ValueError): return None # Canonical endpoints from Superset OpenAPI: # /dashboard/{id_or_slug}/charts and /dashboard/{id_or_slug}/datasets. try: charts_response = self.network.request( method="GET", endpoint=f"/dashboard/{dashboard_id}/charts" ) charts_payload = charts_response.get("result", []) if isinstance(charts_response, dict) else [] for chart_obj in charts_payload: if not isinstance(chart_obj, dict): continue chart_id = chart_obj.get("id") if chart_id is None: continue form_data = chart_obj.get("form_data") if isinstance(form_data, str): try: form_data = json.loads(form_data) except Exception: form_data = {} dataset_id = extract_dataset_id_from_form_data(form_data) or chart_obj.get("datasource_id") charts.append({ "id": int(chart_id), "title": chart_obj.get("slice_name") or chart_obj.get("name") or f"Chart {chart_id}", "viz_type": (form_data.get("viz_type") if isinstance(form_data, dict) else None), "dataset_id": int(dataset_id) if dataset_id is not None else None, "last_modified": chart_obj.get("changed_on"), "overview": chart_obj.get("description") or (form_data.get("viz_type") if isinstance(form_data, dict) else None) or "Chart", }) except Exception as e: app_logger.warning("[get_dashboard_detail][Warning] Failed to fetch dashboard charts: %s", e) try: datasets_response = self.network.request( method="GET", endpoint=f"/dashboard/{dashboard_id}/datasets" ) datasets_payload = datasets_response.get("result", []) if isinstance(datasets_response, dict) else [] for dataset_obj in datasets_payload: if not isinstance(dataset_obj, dict): continue dataset_id = dataset_obj.get("id") if dataset_id is None: continue db_payload = dataset_obj.get("database") db_name = db_payload.get("database_name") if isinstance(db_payload, dict) else None table_name = dataset_obj.get("table_name") or dataset_obj.get("datasource_name") or dataset_obj.get("name") or f"Dataset {dataset_id}" schema = dataset_obj.get("schema") fq_name = f"{schema}.{table_name}" if schema else table_name datasets.append({ "id": int(dataset_id), "table_name": table_name, "schema": schema, "database": db_name or dataset_obj.get("database_name") or "Unknown", "last_modified": dataset_obj.get("changed_on"), "overview": fq_name, }) except Exception as e: app_logger.warning("[get_dashboard_detail][Warning] Failed to fetch dashboard datasets: %s", e) # Fallback: derive chart IDs from layout metadata if dashboard charts endpoint fails. if not charts: raw_position_json = dashboard_data.get("position_json") chart_ids_from_position = set() if isinstance(raw_position_json, str) and raw_position_json: try: parsed_position = json.loads(raw_position_json) chart_ids_from_position.update(self._extract_chart_ids_from_layout(parsed_position)) except Exception: pass elif isinstance(raw_position_json, dict): chart_ids_from_position.update(self._extract_chart_ids_from_layout(raw_position_json)) raw_json_metadata = dashboard_data.get("json_metadata") if isinstance(raw_json_metadata, str) and raw_json_metadata: try: parsed_metadata = json.loads(raw_json_metadata) chart_ids_from_position.update(self._extract_chart_ids_from_layout(parsed_metadata)) except Exception: pass elif isinstance(raw_json_metadata, dict): chart_ids_from_position.update(self._extract_chart_ids_from_layout(raw_json_metadata)) app_logger.info( "[get_dashboard_detail][State] Extracted %s fallback chart IDs from layout (dashboard_id=%s)", len(chart_ids_from_position), dashboard_id, ) for chart_id in sorted(chart_ids_from_position): try: chart_response = self.get_chart(int(chart_id)) chart_data = chart_response.get("result", chart_response) charts.append({ "id": int(chart_id), "title": chart_data.get("slice_name") or chart_data.get("name") or f"Chart {chart_id}", "viz_type": chart_data.get("viz_type"), "dataset_id": chart_data.get("datasource_id"), "last_modified": chart_data.get("changed_on"), "overview": chart_data.get("description") or chart_data.get("viz_type") or "Chart", }) except Exception as e: app_logger.warning("[get_dashboard_detail][Warning] Failed to resolve fallback chart %s: %s", chart_id, e) # Backfill datasets from chart datasource IDs. dataset_ids_from_charts = { c.get("dataset_id") for c in charts if c.get("dataset_id") is not None } known_dataset_ids = {d.get("id") for d in datasets} missing_dataset_ids = [ds_id for ds_id in dataset_ids_from_charts if ds_id not in known_dataset_ids] for dataset_id in missing_dataset_ids: try: dataset_response = self.get_dataset(int(dataset_id)) dataset_data = dataset_response.get("result", dataset_response) db_payload = dataset_data.get("database") db_name = db_payload.get("database_name") if isinstance(db_payload, dict) else None table_name = dataset_data.get("table_name") or f"Dataset {dataset_id}" schema = dataset_data.get("schema") fq_name = f"{schema}.{table_name}" if schema else table_name datasets.append({ "id": int(dataset_id), "table_name": table_name, "schema": schema, "database": db_name or "Unknown", "last_modified": dataset_data.get("changed_on_utc") or dataset_data.get("changed_on"), "overview": fq_name, }) except Exception as e: app_logger.warning("[get_dashboard_detail][Warning] Failed to resolve dataset %s: %s", dataset_id, e) unique_charts = {} for chart in charts: unique_charts[chart["id"]] = chart unique_datasets = {} for dataset in datasets: unique_datasets[dataset["id"]] = dataset return { "id": dashboard_data.get("id", dashboard_id), "title": dashboard_data.get("dashboard_title") or dashboard_data.get("title") or f"Dashboard {dashboard_id}", "slug": dashboard_data.get("slug"), "url": dashboard_data.get("url"), "description": dashboard_data.get("description") or "", "last_modified": dashboard_data.get("changed_on_utc") or dashboard_data.get("changed_on"), "published": dashboard_data.get("published"), "charts": list(unique_charts.values()), "datasets": list(unique_datasets.values()), "chart_count": len(unique_charts), "dataset_count": len(unique_datasets), } # [/DEF:get_dashboard_detail:Function] # [DEF:_extract_chart_ids_from_layout:Function] # @PURPOSE: Traverses dashboard layout metadata and extracts chart IDs from common keys. # @PRE: payload can be dict/list/scalar. # @POST: Returns a set of chart IDs found in nested structures. def _extract_chart_ids_from_layout(self, payload: Union[Dict, List, str, int, None]) -> set: with belief_scope("_extract_chart_ids_from_layout"): found = set() def walk(node): if isinstance(node, dict): for key, value in node.items(): if key in ("chartId", "chart_id", "slice_id", "sliceId"): try: found.add(int(value)) except (TypeError, ValueError): pass if key == "id" and isinstance(value, str): match = re.match(r"^CHART-(\d+)$", value) if match: try: found.add(int(match.group(1))) except ValueError: pass walk(value) elif isinstance(node, list): for item in node: walk(item) walk(payload) return found # [/DEF:_extract_chart_ids_from_layout:Function] # [DEF:export_dashboard:Function] # @PURPOSE: Экспортирует дашборд в виде ZIP-архива. # @PARAM: dashboard_id (int) - ID дашборда для экспорта. # @PRE: dashboard_id must exist in Superset. # @POST: Returns ZIP content and filename. # @RETURN: Tuple[bytes, str] - Бинарное содержимое ZIP-архива и имя файла. def export_dashboard(self, dashboard_id: int) -> Tuple[bytes, str]: with belief_scope("export_dashboard"): app_logger.info("[export_dashboard][Enter] Exporting dashboard %s.", dashboard_id) response = self.network.request( method="GET", endpoint="/dashboard/export/", params={"q": json.dumps([dashboard_id])}, stream=True, raw_response=True, ) response = cast(Response, response) self._validate_export_response(response, dashboard_id) filename = self._resolve_export_filename(response, dashboard_id) app_logger.info("[export_dashboard][Exit] Exported dashboard %s to %s.", dashboard_id, filename) return response.content, filename # [/DEF:export_dashboard:Function] # [DEF:import_dashboard:Function] # @PURPOSE: Импортирует дашборд из ZIP-файла. # @PARAM: file_name (Union[str, Path]) - Путь к ZIP-архиву. # @PARAM: dash_id (Optional[int]) - ID дашборда для удаления при сбое. # @PARAM: dash_slug (Optional[str]) - Slug дашборда для поиска ID. # @PRE: file_name must be a valid ZIP dashboard export. # @POST: Dashboard is imported or re-imported after deletion. # @RETURN: Dict - Ответ API в случае успеха. def import_dashboard(self, file_name: Union[str, Path], dash_id: Optional[int] = None, dash_slug: Optional[str] = None) -> Dict: with belief_scope("import_dashboard"): file_path = str(file_name) self._validate_import_file(file_path) try: return self._do_import(file_path) except Exception as exc: app_logger.error("[import_dashboard][Failure] First import attempt failed: %s", exc, exc_info=True) if not self.delete_before_reimport: raise target_id = self._resolve_target_id_for_delete(dash_id, dash_slug) if target_id is None: app_logger.error("[import_dashboard][Failure] No ID available for delete-retry.") raise self.delete_dashboard(target_id) app_logger.info("[import_dashboard][State] Deleted dashboard ID %s, retrying import.", target_id) return self._do_import(file_path) # [/DEF:import_dashboard:Function] # [DEF:delete_dashboard:Function] # @PURPOSE: Удаляет дашборд по его ID или slug. # @PARAM: dashboard_id (Union[int, str]) - ID или slug дашборда. # @PRE: dashboard_id must exist. # @POST: Dashboard is removed from Superset. def delete_dashboard(self, dashboard_id: Union[int, str]) -> None: with belief_scope("delete_dashboard"): app_logger.info("[delete_dashboard][Enter] Deleting dashboard %s.", dashboard_id) response = self.network.request(method="DELETE", endpoint=f"/dashboard/{dashboard_id}") response = cast(Dict, response) if response.get("result", True) is not False: app_logger.info("[delete_dashboard][Success] Dashboard %s deleted.", dashboard_id) else: app_logger.warning("[delete_dashboard][Warning] Unexpected response while deleting %s: %s", dashboard_id, response) # [/DEF:delete_dashboard:Function] # [/SECTION] # [SECTION: DATASET OPERATIONS] # [DEF:get_datasets:Function] # @PURPOSE: Получает полный список датасетов, автоматически обрабатывая пагинацию. # @PARAM: query (Optional[Dict]) - Дополнительные параметры запроса. # @PRE: Client is authenticated. # @POST: Returns total count and list of datasets. # @RETURN: Tuple[int, List[Dict]] - Кортеж (общее количество, список датасетов). def get_datasets(self, query: Optional[Dict] = None) -> Tuple[int, List[Dict]]: with belief_scope("get_datasets"): app_logger.info("[get_datasets][Enter] Fetching datasets.") validated_query = self._validate_query_params(query) paginated_data = self._fetch_all_pages( endpoint="/dataset/", pagination_options={"base_query": validated_query, "results_field": "result"}, ) total_count = len(paginated_data) app_logger.info("[get_datasets][Exit] Found %d datasets.", total_count) return total_count, paginated_data # [/DEF:get_datasets:Function] # [DEF:get_datasets_summary:Function] # @PURPOSE: Fetches dataset metadata optimized for the Dataset Hub grid. # @PRE: Client is authenticated. # @POST: Returns a list of dataset metadata summaries. # @RETURN: List[Dict] def get_datasets_summary(self) -> List[Dict]: with belief_scope("SupersetClient.get_datasets_summary"): query = { "columns": ["id", "table_name", "schema", "database"] } _, datasets = self.get_datasets(query=query) # Map fields to match the contracts result = [] for ds in datasets: result.append({ "id": ds.get("id"), "table_name": ds.get("table_name"), "schema": ds.get("schema"), "database": ds.get("database", {}).get("database_name", "Unknown") }) return result # [/DEF:get_datasets_summary:Function] # [DEF:get_dataset_detail:Function] # @PURPOSE: Fetches detailed dataset information including columns and linked dashboards # @PRE: Client is authenticated and dataset_id exists. # @POST: Returns detailed dataset info with columns and linked dashboards. # @PARAM: dataset_id (int) - The dataset ID to fetch details for. # @RETURN: Dict - Dataset details with columns and linked_dashboards. # @RELATION: CALLS -> self.get_dataset # @RELATION: CALLS -> self.network.request (for related_objects) def get_dataset_detail(self, dataset_id: int) -> Dict: with belief_scope("SupersetClient.get_dataset_detail", f"id={dataset_id}"): def as_bool(value, default=False): if value is None: return default if isinstance(value, bool): return value if isinstance(value, str): return value.strip().lower() in ("1", "true", "yes", "y", "on") return bool(value) # Get base dataset info response = self.get_dataset(dataset_id) # If the response is a dict and has a 'result' key, use that (standard Superset API) if isinstance(response, dict) and 'result' in response: dataset = response['result'] else: dataset = response # Extract columns information columns = dataset.get("columns", []) column_info = [] for col in columns: col_id = col.get("id") if col_id is None: continue column_info.append({ "id": int(col_id), "name": col.get("column_name"), "type": col.get("type"), "is_dttm": as_bool(col.get("is_dttm"), default=False), "is_active": as_bool(col.get("is_active"), default=True), "description": col.get("description", "") }) # Get linked dashboards using related_objects endpoint linked_dashboards = [] try: related_objects = self.network.request( method="GET", endpoint=f"/dataset/{dataset_id}/related_objects" ) # Handle different response formats if isinstance(related_objects, dict): if "dashboards" in related_objects: dashboards_data = related_objects["dashboards"] elif "result" in related_objects and isinstance(related_objects["result"], dict): dashboards_data = related_objects["result"].get("dashboards", []) else: dashboards_data = [] for dash in dashboards_data: if isinstance(dash, dict): dash_id = dash.get("id") if dash_id is None: continue linked_dashboards.append({ "id": int(dash_id), "title": dash.get("dashboard_title") or dash.get("title", f"Dashboard {dash_id}"), "slug": dash.get("slug") }) else: try: dash_id = int(dash) except (TypeError, ValueError): continue linked_dashboards.append({ "id": dash_id, "title": f"Dashboard {dash_id}", "slug": None }) except Exception as e: app_logger.warning(f"[get_dataset_detail][Warning] Failed to fetch related dashboards: {e}") linked_dashboards = [] # Extract SQL table information sql = dataset.get("sql", "") result = { "id": dataset.get("id"), "table_name": dataset.get("table_name"), "schema": dataset.get("schema"), "database": ( dataset.get("database", {}).get("database_name", "Unknown") if isinstance(dataset.get("database"), dict) else dataset.get("database_name") or "Unknown" ), "description": dataset.get("description", ""), "columns": column_info, "column_count": len(column_info), "sql": sql, "linked_dashboards": linked_dashboards, "linked_dashboard_count": len(linked_dashboards), "is_sqllab_view": as_bool(dataset.get("is_sqllab_view"), default=False), "created_on": dataset.get("created_on"), "changed_on": dataset.get("changed_on") } app_logger.info(f"[get_dataset_detail][Exit] Got dataset {dataset_id} with {len(column_info)} columns and {len(linked_dashboards)} linked dashboards") return result # [/DEF:get_dataset_detail:Function] # [DEF:get_dataset:Function] # @PURPOSE: Получает информацию о конкретном датасете по его ID. # @PARAM: dataset_id (int) - ID датасета. # @PRE: dataset_id must exist. # @POST: Returns dataset details. # @RETURN: Dict - Информация о датасете. def get_dataset(self, dataset_id: int) -> Dict: with belief_scope("SupersetClient.get_dataset", f"id={dataset_id}"): app_logger.info("[get_dataset][Enter] Fetching dataset %s.", dataset_id) response = self.network.request(method="GET", endpoint=f"/dataset/{dataset_id}") response = cast(Dict, response) app_logger.info("[get_dataset][Exit] Got dataset %s.", dataset_id) return response # [/DEF:get_dataset:Function] # [DEF:update_dataset:Function] # @PURPOSE: Обновляет данные датасета по его ID. # @PARAM: dataset_id (int) - ID датасета. # @PARAM: data (Dict) - Данные для обновления. # @PRE: dataset_id must exist. # @POST: Dataset is updated in Superset. # @RETURN: Dict - Ответ API. def update_dataset(self, dataset_id: int, data: Dict) -> Dict: with belief_scope("SupersetClient.update_dataset", f"id={dataset_id}"): app_logger.info("[update_dataset][Enter] Updating dataset %s.", dataset_id) response = self.network.request( method="PUT", endpoint=f"/dataset/{dataset_id}", data=json.dumps(data), headers={'Content-Type': 'application/json'} ) response = cast(Dict, response) app_logger.info("[update_dataset][Exit] Updated dataset %s.", dataset_id) return response # [/DEF:update_dataset:Function] # [/SECTION] # [SECTION: DATABASE OPERATIONS] # [DEF:get_databases:Function] # @PURPOSE: Получает полный список баз данных. # @PARAM: query (Optional[Dict]) - Дополнительные параметры запроса. # @PRE: Client is authenticated. # @POST: Returns total count and list of databases. # @RETURN: Tuple[int, List[Dict]] - Кортеж (общее количество, список баз данных). def get_databases(self, query: Optional[Dict] = None) -> Tuple[int, List[Dict]]: with belief_scope("get_databases"): app_logger.info("[get_databases][Enter] Fetching databases.") validated_query = self._validate_query_params(query or {}) if 'columns' not in validated_query: validated_query['columns'] = [] paginated_data = self._fetch_all_pages( endpoint="/database/", pagination_options={"base_query": validated_query, "results_field": "result"}, ) total_count = len(paginated_data) app_logger.info("[get_databases][Exit] Found %d databases.", total_count) return total_count, paginated_data # [/DEF:get_databases:Function] # [DEF:get_database:Function] # @PURPOSE: Получает информацию о конкретной базе данных по её ID. # @PARAM: database_id (int) - ID базы данных. # @PRE: database_id must exist. # @POST: Returns database details. # @RETURN: Dict - Информация о базе данных. def get_database(self, database_id: int) -> Dict: with belief_scope("get_database"): app_logger.info("[get_database][Enter] Fetching database %s.", database_id) response = self.network.request(method="GET", endpoint=f"/database/{database_id}") response = cast(Dict, response) app_logger.info("[get_database][Exit] Got database %s.", database_id) return response # [/DEF:get_database:Function] # [DEF:get_databases_summary:Function] # @PURPOSE: Fetch a summary of databases including uuid, name, and engine. # @PRE: Client is authenticated. # @POST: Returns list of database summaries. # @RETURN: List[Dict] - Summary of databases. def get_databases_summary(self) -> List[Dict]: with belief_scope("SupersetClient.get_databases_summary"): query = { "columns": ["uuid", "database_name", "backend"] } _, databases = self.get_databases(query=query) # Map 'backend' to 'engine' for consistency with contracts for db in databases: db['engine'] = db.pop('backend', None) return databases # [/DEF:get_databases_summary:Function] # [DEF:get_database_by_uuid:Function] # @PURPOSE: Find a database by its UUID. # @PARAM: db_uuid (str) - The UUID of the database. # @PRE: db_uuid must be a valid UUID string. # @POST: Returns database info or None. # @RETURN: Optional[Dict] - Database info if found, else None. def get_database_by_uuid(self, db_uuid: str) -> Optional[Dict]: with belief_scope("SupersetClient.get_database_by_uuid", f"uuid={db_uuid}"): query = { "filters": [{"col": "uuid", "op": "eq", "value": db_uuid}] } _, databases = self.get_databases(query=query) return databases[0] if databases else None # [/DEF:get_database_by_uuid:Function] # [/SECTION] # [SECTION: HELPERS] # [DEF:_resolve_target_id_for_delete:Function] # @PURPOSE: Resolves a dashboard ID from either an ID or a slug. # @PRE: Either dash_id or dash_slug should be provided. # @POST: Returns the resolved ID or None. def _resolve_target_id_for_delete(self, dash_id: Optional[int], dash_slug: Optional[str]) -> Optional[int]: with belief_scope("_resolve_target_id_for_delete"): if dash_id is not None: return dash_id if dash_slug is not None: app_logger.debug("[_resolve_target_id_for_delete][State] Resolving ID by slug '%s'.", dash_slug) try: _, candidates = self.get_dashboards(query={"filters": [{"col": "slug", "op": "eq", "value": dash_slug}]}) if candidates: target_id = candidates[0]["id"] app_logger.debug("[_resolve_target_id_for_delete][Success] Resolved slug to ID %s.", target_id) return target_id except Exception as e: app_logger.warning("[_resolve_target_id_for_delete][Warning] Could not resolve slug '%s' to ID: %s", dash_slug, e) return None # [/DEF:_resolve_target_id_for_delete:Function] # [DEF:_do_import:Function] # @PURPOSE: Performs the actual multipart upload for import. # @PRE: file_name must be a path to an existing ZIP file. # @POST: Returns the API response from the upload. def _do_import(self, file_name: Union[str, Path]) -> Dict: with belief_scope("_do_import"): app_logger.debug(f"[_do_import][State] Uploading file: {file_name}") file_path = Path(file_name) if not file_path.exists(): app_logger.error(f"[_do_import][Failure] File does not exist: {file_name}") raise FileNotFoundError(f"File does not exist: {file_name}") return self.network.upload_file( endpoint="/dashboard/import/", file_info={"file_obj": file_path, "file_name": file_path.name, "form_field": "formData"}, extra_data={"overwrite": "true"}, timeout=self.env.timeout * 2, ) # [/DEF:_do_import:Function] # [DEF:_validate_export_response:Function] # @PURPOSE: Validates that the export response is a non-empty ZIP archive. # @PRE: response must be a valid requests.Response object. # @POST: Raises SupersetAPIError if validation fails. def _validate_export_response(self, response: Response, dashboard_id: int) -> None: with belief_scope("_validate_export_response"): content_type = response.headers.get("Content-Type", "") if "application/zip" not in content_type: raise SupersetAPIError(f"Получен не ZIP-архив (Content-Type: {content_type})") if not response.content: raise SupersetAPIError("Получены пустые данные при экспорте") # [/DEF:_validate_export_response:Function] # [DEF:_resolve_export_filename:Function] # @PURPOSE: Determines the filename for an exported dashboard. # @PRE: response must contain Content-Disposition header or dashboard_id must be provided. # @POST: Returns a sanitized filename string. def _resolve_export_filename(self, response: Response, dashboard_id: int) -> str: with belief_scope("_resolve_export_filename"): filename = get_filename_from_headers(dict(response.headers)) if not filename: from datetime import datetime timestamp = datetime.now().strftime("%Y%m%dT%H%M%S") filename = f"dashboard_export_{dashboard_id}_{timestamp}.zip" app_logger.warning("[_resolve_export_filename][Warning] Generated filename: %s", filename) return filename # [/DEF:_resolve_export_filename:Function] # [DEF:_validate_query_params:Function] # @PURPOSE: Ensures query parameters have default page and page_size. # @PRE: query can be None or a dictionary. # @POST: Returns a dictionary with at least page and page_size. def _validate_query_params(self, query: Optional[Dict]) -> Dict: with belief_scope("_validate_query_params"): base_query = {"page": 0, "page_size": 1000} return {**base_query, **(query or {})} # [/DEF:_validate_query_params:Function] # [DEF:_fetch_total_object_count:Function] # @PURPOSE: Fetches the total number of items for a given endpoint. # @PRE: endpoint must be a valid Superset API path. # @POST: Returns the total count as an integer. def _fetch_total_object_count(self, endpoint: str) -> int: with belief_scope("_fetch_total_object_count"): return self.network.fetch_paginated_count( endpoint=endpoint, query_params={"page": 0, "page_size": 1}, count_field="count", ) # [/DEF:_fetch_total_object_count:Function] # [DEF:_fetch_all_pages:Function] # @PURPOSE: Iterates through all pages to collect all data items. # @PRE: pagination_options must contain base_query, total_count, and results_field. # @POST: Returns a combined list of all items. def _fetch_all_pages(self, endpoint: str, pagination_options: Dict) -> List[Dict]: with belief_scope("_fetch_all_pages"): return self.network.fetch_paginated_data(endpoint=endpoint, pagination_options=pagination_options) # [/DEF:_fetch_all_pages:Function] # [DEF:_validate_import_file:Function] # @PURPOSE: Validates that the file to be imported is a valid ZIP with metadata.yaml. # @PRE: zip_path must be a path to a file. # @POST: Raises error if file is missing, not a ZIP, or missing metadata. def _validate_import_file(self, zip_path: Union[str, Path]) -> None: with belief_scope("_validate_import_file"): path = Path(zip_path) if not path.exists(): raise FileNotFoundError(f"Файл {zip_path} не существует") if not zipfile.is_zipfile(path): raise SupersetAPIError(f"Файл {zip_path} не является ZIP-архивом") with zipfile.ZipFile(path, "r") as zf: if not any(n.endswith("metadata.yaml") for n in zf.namelist()): raise SupersetAPIError(f"Архив {zip_path} не содержит 'metadata.yaml'") # [/DEF:_validate_import_file:Function] # [/SECTION] # [/DEF:SupersetClient:Class] # [/DEF:backend.src.core.superset_client:Module]