# [DEF:AppModule:Module] # @TIER: CRITICAL # @SEMANTICS: app, main, entrypoint, fastapi # @PURPOSE: The main entry point for the FastAPI application. It initializes the app, configures CORS, sets up dependencies, includes API routers, and defines the WebSocket endpoint for log streaming. # @LAYER: UI (API) # @RELATION: Depends on the dependency module and API route modules. # @INVARIANT: Only one FastAPI app instance exists per process. # @INVARIANT: All WebSocket connections must be properly cleaned up on disconnect. from pathlib import Path # project_root is used for static files mounting project_root = Path(__file__).resolve().parent.parent.parent from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request, HTTPException from starlette.middleware.sessions import SessionMiddleware from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from fastapi.responses import FileResponse import asyncio from .dependencies import get_task_manager, get_scheduler_service from .core.utils.network import NetworkError from .core.logger import logger, belief_scope from .api.routes import plugins, tasks, settings, environments, mappings, migration, connections, git, storage, admin, llm, dashboards, datasets, reports from .api import auth # [DEF:App:Global] # @SEMANTICS: app, fastapi, instance # @PURPOSE: The global FastAPI application instance. app = FastAPI( title="Superset Tools API", description="API for managing Superset automation tools and plugins.", version="1.0.0", ) # [/DEF:App:Global] # [DEF:startup_event:Function] # @PURPOSE: Handles application startup tasks, such as starting the scheduler. # @PRE: None. # @POST: Scheduler is started. # Startup event @app.on_event("startup") async def startup_event(): with belief_scope("startup_event"): scheduler = get_scheduler_service() scheduler.start() # [/DEF:startup_event:Function] # [DEF:shutdown_event:Function] # @PURPOSE: Handles application shutdown tasks, such as stopping the scheduler. # @PRE: None. # @POST: Scheduler is stopped. # Shutdown event @app.on_event("shutdown") async def shutdown_event(): with belief_scope("shutdown_event"): scheduler = get_scheduler_service() scheduler.stop() # [/DEF:shutdown_event:Function] # Configure Session Middleware (required by Authlib for OAuth2 flow) from .core.auth.config import auth_config app.add_middleware(SessionMiddleware, secret_key=auth_config.SECRET_KEY) # Configure CORS app.add_middleware( CORSMiddleware, allow_origins=["*"], # Adjust this in production allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # [DEF:log_requests:Function] # @PURPOSE: Middleware to log incoming HTTP requests and their response status. # @PRE: request is a FastAPI Request object. # @POST: Logs request and response details. # @PARAM: request (Request) - The incoming request object. # @PARAM: call_next (Callable) - The next middleware or route handler. @app.exception_handler(NetworkError) async def network_error_handler(request: Request, exc: NetworkError): with belief_scope("network_error_handler"): logger.error(f"Network error: {exc}") return HTTPException( status_code=503, detail="Environment unavailable. Please check if the Superset instance is running." ) @app.middleware("http") async def log_requests(request: Request, call_next): # Avoid spamming logs for polling endpoints is_polling = request.url.path.endswith("/api/tasks") and request.method == "GET" if not is_polling: logger.info(f"Incoming request: {request.method} {request.url.path}") try: response = await call_next(request) if not is_polling: logger.info(f"Response status: {response.status_code} for {request.url.path}") return response except NetworkError as e: logger.error(f"Network error caught in middleware: {e}") raise HTTPException( status_code=503, detail="Environment unavailable. Please check if the Superset instance is running." ) # [/DEF:log_requests:Function] # Include API routes app.include_router(auth.router) app.include_router(admin.router) app.include_router(plugins.router, prefix="/api/plugins", tags=["Plugins"]) app.include_router(tasks.router, prefix="/api/tasks", tags=["Tasks"]) app.include_router(settings.router, prefix="/api/settings", tags=["Settings"]) app.include_router(connections.router, prefix="/api/settings/connections", tags=["Connections"]) app.include_router(environments.router, tags=["Environments"]) app.include_router(mappings.router, prefix="/api/mappings", tags=["Mappings"]) app.include_router(migration.router) app.include_router(git.router, prefix="/api/git", tags=["Git"]) app.include_router(llm.router, prefix="/api/llm", tags=["LLM"]) app.include_router(storage.router, prefix="/api/storage", tags=["Storage"]) app.include_router(dashboards.router) app.include_router(datasets.router) app.include_router(reports.router) # [DEF:api.include_routers:Action] # @PURPOSE: Registers all API routers with the FastAPI application. # @LAYER: API # @SEMANTICS: routes, registration, api # [/DEF:api.include_routers:Action] # [DEF:websocket_endpoint:Function] # @PURPOSE: Provides a WebSocket endpoint for real-time log streaming of a task with server-side filtering. # @PRE: task_id must be a valid task ID. # @POST: WebSocket connection is managed and logs are streamed until disconnect. # @TIER: CRITICAL # @UX_STATE: Connecting -> Streaming -> (Disconnected) @app.websocket("/ws/logs/{task_id}") async def websocket_endpoint( websocket: WebSocket, task_id: str, source: str = None, level: str = None ): """ WebSocket endpoint for real-time log streaming with optional server-side filtering. Query Parameters: source: Filter logs by source component (e.g., "plugin", "superset_api") level: Filter logs by minimum level (DEBUG, INFO, WARNING, ERROR) """ with belief_scope("websocket_endpoint", f"task_id={task_id}"): await websocket.accept() # Normalize filter parameters source_filter = source.lower() if source else None level_filter = level.upper() if level else None # Level hierarchy for filtering level_hierarchy = {"DEBUG": 0, "INFO": 1, "WARNING": 2, "ERROR": 3} min_level = level_hierarchy.get(level_filter, 0) if level_filter else 0 logger.info(f"WebSocket connection accepted for task {task_id} (source={source_filter}, level={level_filter})") task_manager = get_task_manager() queue = await task_manager.subscribe_logs(task_id) def matches_filters(log_entry) -> bool: """Check if log entry matches the filter criteria.""" # Check source filter if source_filter and log_entry.source.lower() != source_filter: return False # Check level filter if level_filter: log_level = level_hierarchy.get(log_entry.level.upper(), 0) if log_level < min_level: return False return True try: # Stream new logs logger.info(f"Starting log stream for task {task_id}") # Send initial logs first to build context (apply filters) initial_logs = task_manager.get_task_logs(task_id) for log_entry in initial_logs: if matches_filters(log_entry): log_dict = log_entry.dict() log_dict['timestamp'] = log_dict['timestamp'].isoformat() await websocket.send_json(log_dict) # Force a check for AWAITING_INPUT status immediately upon connection # This ensures that if the task is already waiting when the user connects, they get the prompt. task = task_manager.get_task(task_id) if task and task.status == "AWAITING_INPUT" and task.input_request: # Construct a synthetic log entry to trigger the frontend handler # This is a bit of a hack but avoids changing the websocket protocol significantly synthetic_log = { "timestamp": task.logs[-1].timestamp.isoformat() if task.logs else "2024-01-01T00:00:00", "level": "INFO", "message": "Task paused for user input (Connection Re-established)", "context": {"input_request": task.input_request} } await websocket.send_json(synthetic_log) while True: log_entry = await queue.get() # Apply server-side filtering if not matches_filters(log_entry): continue log_dict = log_entry.dict() log_dict['timestamp'] = log_dict['timestamp'].isoformat() await websocket.send_json(log_dict) # If task is finished, we could potentially close the connection # but let's keep it open for a bit or until the client disconnects if "Task completed successfully" in log_entry.message or "Task failed" in log_entry.message: # Wait a bit to ensure client receives the last message await asyncio.sleep(2) # DO NOT BREAK here - allow client to keep connection open if they want to review logs # or until they disconnect. Breaking closes the socket immediately. # break except WebSocketDisconnect: logger.info(f"WebSocket connection disconnected for task {task_id}") except Exception as e: logger.error(f"WebSocket error for task {task_id}: {e}") finally: task_manager.unsubscribe_logs(task_id, queue) # [/DEF:websocket_endpoint:Function] # [DEF:StaticFiles:Mount] # @SEMANTICS: static, frontend, spa # @PURPOSE: Mounts the frontend build directory to serve static assets. frontend_path = project_root / "frontend" / "build" if frontend_path.exists(): app.mount("/_app", StaticFiles(directory=str(frontend_path / "_app")), name="static") # [DEF:serve_spa:Function] # @PURPOSE: Serves the SPA frontend for any path not matched by API routes. # @PRE: frontend_path exists. # @POST: Returns the requested file or index.html. @app.get("/{file_path:path}", include_in_schema=False) async def serve_spa(file_path: str): # Only serve SPA for non-API paths # API routes are registered separately and should be matched by FastAPI first if file_path and (file_path.startswith("api/") or file_path.startswith("/api/") or file_path == "api"): # This should not happen if API routers are properly registered # Return 404 instead of serving HTML raise HTTPException(status_code=404, detail=f"API endpoint not found: {file_path}") full_path = frontend_path / file_path if file_path and full_path.is_file(): return FileResponse(str(full_path)) return FileResponse(str(frontend_path / "index.html")) # [/DEF:serve_spa:Function] else: # [DEF:read_root:Function] # @PURPOSE: A simple root endpoint to confirm that the API is running when frontend is missing. # @PRE: None. # @POST: Returns a JSON message indicating API status. @app.get("/") async def read_root(): with belief_scope("read_root"): return {"message": "Superset Tools API is running (Frontend build not found)"} # [/DEF:read_root:Function] # [/DEF:StaticFiles:Mount] # [/DEF:AppModule:Module]