275 lines
12 KiB
Python
Executable File
275 lines
12 KiB
Python
Executable File
# [DEF:AppModule:Module]
|
|
# @TIER: CRITICAL
|
|
# @SEMANTICS: app, main, entrypoint, fastapi
|
|
# @PURPOSE: The main entry point for the FastAPI application. It initializes the app, configures CORS, sets up dependencies, includes API routers, and defines the WebSocket endpoint for log streaming.
|
|
# @LAYER: UI (API)
|
|
# @RELATION: Depends on the dependency module and API route modules.
|
|
# @INVARIANT: Only one FastAPI app instance exists per process.
|
|
# @INVARIANT: All WebSocket connections must be properly cleaned up on disconnect.
|
|
from pathlib import Path
|
|
|
|
# project_root is used for static files mounting
|
|
project_root = Path(__file__).resolve().parent.parent.parent
|
|
|
|
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request, HTTPException
|
|
from starlette.middleware.sessions import SessionMiddleware
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.staticfiles import StaticFiles
|
|
from fastapi.responses import FileResponse
|
|
import asyncio
|
|
|
|
from .dependencies import get_task_manager, get_scheduler_service
|
|
from .core.utils.network import NetworkError
|
|
from .core.logger import logger, belief_scope
|
|
from .api.routes import plugins, tasks, settings, environments, mappings, migration, connections, git, storage, admin, llm, dashboards, datasets, reports
|
|
from .api import auth
|
|
|
|
# [DEF:App:Global]
|
|
# @SEMANTICS: app, fastapi, instance
|
|
# @PURPOSE: The global FastAPI application instance.
|
|
app = FastAPI(
|
|
title="Superset Tools API",
|
|
description="API for managing Superset automation tools and plugins.",
|
|
version="1.0.0",
|
|
)
|
|
# [/DEF:App:Global]
|
|
|
|
# [DEF:startup_event:Function]
|
|
# @PURPOSE: Handles application startup tasks, such as starting the scheduler.
|
|
# @PRE: None.
|
|
# @POST: Scheduler is started.
|
|
# Startup event
|
|
@app.on_event("startup")
|
|
async def startup_event():
|
|
with belief_scope("startup_event"):
|
|
scheduler = get_scheduler_service()
|
|
scheduler.start()
|
|
# [/DEF:startup_event:Function]
|
|
|
|
# [DEF:shutdown_event:Function]
|
|
# @PURPOSE: Handles application shutdown tasks, such as stopping the scheduler.
|
|
# @PRE: None.
|
|
# @POST: Scheduler is stopped.
|
|
# Shutdown event
|
|
@app.on_event("shutdown")
|
|
async def shutdown_event():
|
|
with belief_scope("shutdown_event"):
|
|
scheduler = get_scheduler_service()
|
|
scheduler.stop()
|
|
# [/DEF:shutdown_event:Function]
|
|
|
|
# Configure Session Middleware (required by Authlib for OAuth2 flow)
|
|
from .core.auth.config import auth_config
|
|
app.add_middleware(SessionMiddleware, secret_key=auth_config.SECRET_KEY)
|
|
|
|
# Configure CORS
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"], # Adjust this in production
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
|
|
# [DEF:log_requests:Function]
|
|
# @PURPOSE: Middleware to log incoming HTTP requests and their response status.
|
|
# @PRE: request is a FastAPI Request object.
|
|
# @POST: Logs request and response details.
|
|
# @PARAM: request (Request) - The incoming request object.
|
|
# @PARAM: call_next (Callable) - The next middleware or route handler.
|
|
@app.exception_handler(NetworkError)
|
|
async def network_error_handler(request: Request, exc: NetworkError):
|
|
with belief_scope("network_error_handler"):
|
|
logger.error(f"Network error: {exc}")
|
|
return HTTPException(
|
|
status_code=503,
|
|
detail="Environment unavailable. Please check if the Superset instance is running."
|
|
)
|
|
|
|
@app.middleware("http")
|
|
async def log_requests(request: Request, call_next):
|
|
# Avoid spamming logs for polling endpoints
|
|
is_polling = request.url.path.endswith("/api/tasks") and request.method == "GET"
|
|
|
|
if not is_polling:
|
|
logger.info(f"Incoming request: {request.method} {request.url.path}")
|
|
|
|
try:
|
|
response = await call_next(request)
|
|
if not is_polling:
|
|
logger.info(f"Response status: {response.status_code} for {request.url.path}")
|
|
return response
|
|
except NetworkError as e:
|
|
logger.error(f"Network error caught in middleware: {e}")
|
|
raise HTTPException(
|
|
status_code=503,
|
|
detail="Environment unavailable. Please check if the Superset instance is running."
|
|
)
|
|
# [/DEF:log_requests:Function]
|
|
|
|
# Include API routes
|
|
app.include_router(auth.router)
|
|
app.include_router(admin.router)
|
|
app.include_router(plugins.router, prefix="/api/plugins", tags=["Plugins"])
|
|
app.include_router(tasks.router, prefix="/api/tasks", tags=["Tasks"])
|
|
app.include_router(settings.router, prefix="/api/settings", tags=["Settings"])
|
|
app.include_router(connections.router, prefix="/api/settings/connections", tags=["Connections"])
|
|
app.include_router(environments.router, tags=["Environments"])
|
|
app.include_router(mappings.router, prefix="/api/mappings", tags=["Mappings"])
|
|
app.include_router(migration.router)
|
|
app.include_router(git.router, prefix="/api/git", tags=["Git"])
|
|
app.include_router(llm.router, prefix="/api/llm", tags=["LLM"])
|
|
app.include_router(storage.router, prefix="/api/storage", tags=["Storage"])
|
|
app.include_router(dashboards.router)
|
|
app.include_router(datasets.router)
|
|
app.include_router(reports.router)
|
|
|
|
|
|
# [DEF:api.include_routers:Action]
|
|
# @PURPOSE: Registers all API routers with the FastAPI application.
|
|
# @LAYER: API
|
|
# @SEMANTICS: routes, registration, api
|
|
# [/DEF:api.include_routers:Action]
|
|
|
|
# [DEF:websocket_endpoint:Function]
|
|
# @PURPOSE: Provides a WebSocket endpoint for real-time log streaming of a task with server-side filtering.
|
|
# @PRE: task_id must be a valid task ID.
|
|
# @POST: WebSocket connection is managed and logs are streamed until disconnect.
|
|
# @TIER: CRITICAL
|
|
# @UX_STATE: Connecting -> Streaming -> (Disconnected)
|
|
@app.websocket("/ws/logs/{task_id}")
|
|
async def websocket_endpoint(
|
|
websocket: WebSocket,
|
|
task_id: str,
|
|
source: str = None,
|
|
level: str = None
|
|
):
|
|
"""
|
|
WebSocket endpoint for real-time log streaming with optional server-side filtering.
|
|
|
|
Query Parameters:
|
|
source: Filter logs by source component (e.g., "plugin", "superset_api")
|
|
level: Filter logs by minimum level (DEBUG, INFO, WARNING, ERROR)
|
|
"""
|
|
with belief_scope("websocket_endpoint", f"task_id={task_id}"):
|
|
await websocket.accept()
|
|
|
|
# Normalize filter parameters
|
|
source_filter = source.lower() if source else None
|
|
level_filter = level.upper() if level else None
|
|
|
|
# Level hierarchy for filtering
|
|
level_hierarchy = {"DEBUG": 0, "INFO": 1, "WARNING": 2, "ERROR": 3}
|
|
min_level = level_hierarchy.get(level_filter, 0) if level_filter else 0
|
|
|
|
logger.info(f"WebSocket connection accepted for task {task_id} (source={source_filter}, level={level_filter})")
|
|
task_manager = get_task_manager()
|
|
queue = await task_manager.subscribe_logs(task_id)
|
|
|
|
def matches_filters(log_entry) -> bool:
|
|
"""Check if log entry matches the filter criteria."""
|
|
# Check source filter
|
|
if source_filter and log_entry.source.lower() != source_filter:
|
|
return False
|
|
|
|
# Check level filter
|
|
if level_filter:
|
|
log_level = level_hierarchy.get(log_entry.level.upper(), 0)
|
|
if log_level < min_level:
|
|
return False
|
|
|
|
return True
|
|
|
|
try:
|
|
# Stream new logs
|
|
logger.info(f"Starting log stream for task {task_id}")
|
|
|
|
# Send initial logs first to build context (apply filters)
|
|
initial_logs = task_manager.get_task_logs(task_id)
|
|
for log_entry in initial_logs:
|
|
if matches_filters(log_entry):
|
|
log_dict = log_entry.dict()
|
|
log_dict['timestamp'] = log_dict['timestamp'].isoformat()
|
|
await websocket.send_json(log_dict)
|
|
|
|
# Force a check for AWAITING_INPUT status immediately upon connection
|
|
# This ensures that if the task is already waiting when the user connects, they get the prompt.
|
|
task = task_manager.get_task(task_id)
|
|
if task and task.status == "AWAITING_INPUT" and task.input_request:
|
|
# Construct a synthetic log entry to trigger the frontend handler
|
|
# This is a bit of a hack but avoids changing the websocket protocol significantly
|
|
synthetic_log = {
|
|
"timestamp": task.logs[-1].timestamp.isoformat() if task.logs else "2024-01-01T00:00:00",
|
|
"level": "INFO",
|
|
"message": "Task paused for user input (Connection Re-established)",
|
|
"context": {"input_request": task.input_request}
|
|
}
|
|
await websocket.send_json(synthetic_log)
|
|
|
|
while True:
|
|
log_entry = await queue.get()
|
|
|
|
# Apply server-side filtering
|
|
if not matches_filters(log_entry):
|
|
continue
|
|
|
|
log_dict = log_entry.dict()
|
|
log_dict['timestamp'] = log_dict['timestamp'].isoformat()
|
|
await websocket.send_json(log_dict)
|
|
|
|
# If task is finished, we could potentially close the connection
|
|
# but let's keep it open for a bit or until the client disconnects
|
|
if "Task completed successfully" in log_entry.message or "Task failed" in log_entry.message:
|
|
# Wait a bit to ensure client receives the last message
|
|
await asyncio.sleep(2)
|
|
# DO NOT BREAK here - allow client to keep connection open if they want to review logs
|
|
# or until they disconnect. Breaking closes the socket immediately.
|
|
# break
|
|
|
|
except WebSocketDisconnect:
|
|
logger.info(f"WebSocket connection disconnected for task {task_id}")
|
|
except Exception as e:
|
|
logger.error(f"WebSocket error for task {task_id}: {e}")
|
|
finally:
|
|
task_manager.unsubscribe_logs(task_id, queue)
|
|
# [/DEF:websocket_endpoint:Function]
|
|
|
|
# [DEF:StaticFiles:Mount]
|
|
# @SEMANTICS: static, frontend, spa
|
|
# @PURPOSE: Mounts the frontend build directory to serve static assets.
|
|
frontend_path = project_root / "frontend" / "build"
|
|
if frontend_path.exists():
|
|
app.mount("/_app", StaticFiles(directory=str(frontend_path / "_app")), name="static")
|
|
|
|
# [DEF:serve_spa:Function]
|
|
# @PURPOSE: Serves the SPA frontend for any path not matched by API routes.
|
|
# @PRE: frontend_path exists.
|
|
# @POST: Returns the requested file or index.html.
|
|
@app.get("/{file_path:path}", include_in_schema=False)
|
|
async def serve_spa(file_path: str):
|
|
# Only serve SPA for non-API paths
|
|
# API routes are registered separately and should be matched by FastAPI first
|
|
if file_path and (file_path.startswith("api/") or file_path.startswith("/api/") or file_path == "api"):
|
|
# This should not happen if API routers are properly registered
|
|
# Return 404 instead of serving HTML
|
|
raise HTTPException(status_code=404, detail=f"API endpoint not found: {file_path}")
|
|
|
|
full_path = frontend_path / file_path
|
|
if file_path and full_path.is_file():
|
|
return FileResponse(str(full_path))
|
|
return FileResponse(str(frontend_path / "index.html"))
|
|
# [/DEF:serve_spa:Function]
|
|
else:
|
|
# [DEF:read_root:Function]
|
|
# @PURPOSE: A simple root endpoint to confirm that the API is running when frontend is missing.
|
|
# @PRE: None.
|
|
# @POST: Returns a JSON message indicating API status.
|
|
@app.get("/")
|
|
async def read_root():
|
|
with belief_scope("read_root"):
|
|
return {"message": "Superset Tools API is running (Frontend build not found)"}
|
|
# [/DEF:read_root:Function]
|
|
# [/DEF:StaticFiles:Mount]
|
|
# [/DEF:AppModule:Module]
|