# [DEF:AppModule:Module] # @SEMANTICS: app, main, entrypoint, fastapi # @PURPOSE: The main entry point for the FastAPI application. It initializes the app, configures CORS, sets up dependencies, includes API routers, and defines the WebSocket endpoint for log streaming. # @LAYER: UI (API) # @RELATION: Depends on the dependency module and API route modules. import sys from pathlib import Path # project_root is used for static files mounting project_root = Path(__file__).resolve().parent.parent.parent from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Depends, Request, HTTPException from starlette.middleware.sessions import SessionMiddleware from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from fastapi.responses import FileResponse import asyncio import os from .dependencies import get_task_manager, get_scheduler_service from .core.logger import logger, belief_scope from .api.routes import plugins, tasks, settings, environments, mappings, migration, connections, git, storage, admin, llm from .api import auth from .core.database import init_db # [DEF:App:Global] # @SEMANTICS: app, fastapi, instance # @PURPOSE: The global FastAPI application instance. app = FastAPI( title="Superset Tools API", description="API for managing Superset automation tools and plugins.", version="1.0.0", ) # [/DEF:App:Global] # [DEF:startup_event:Function] # @PURPOSE: Handles application startup tasks, such as starting the scheduler. # @PRE: None. # @POST: Scheduler is started. # Startup event @app.on_event("startup") async def startup_event(): with belief_scope("startup_event"): scheduler = get_scheduler_service() scheduler.start() # [/DEF:startup_event:Function] # [DEF:shutdown_event:Function] # @PURPOSE: Handles application shutdown tasks, such as stopping the scheduler. # @PRE: None. # @POST: Scheduler is stopped. # Shutdown event @app.on_event("shutdown") async def shutdown_event(): with belief_scope("shutdown_event"): scheduler = get_scheduler_service() scheduler.stop() # [/DEF:shutdown_event:Function] # Configure Session Middleware (required by Authlib for OAuth2 flow) from .core.auth.config import auth_config app.add_middleware(SessionMiddleware, secret_key=auth_config.SECRET_KEY) # Configure CORS app.add_middleware( CORSMiddleware, allow_origins=["*"], # Adjust this in production allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # [DEF:log_requests:Function] # @PURPOSE: Middleware to log incoming HTTP requests and their response status. # @PRE: request is a FastAPI Request object. # @POST: Logs request and response details. # @PARAM: request (Request) - The incoming request object. # @PARAM: call_next (Callable) - The next middleware or route handler. @app.middleware("http") async def log_requests(request: Request, call_next): with belief_scope("log_requests", f"{request.method} {request.url.path}"): logger.info(f"[DEBUG] Incoming request: {request.method} {request.url.path}") response = await call_next(request) logger.info(f"[DEBUG] Response status: {response.status_code} for {request.url.path}") return response # [/DEF:log_requests:Function] # Include API routes app.include_router(auth.router) app.include_router(admin.router) app.include_router(plugins.router, prefix="/api/plugins", tags=["Plugins"]) app.include_router(tasks.router, prefix="/api/tasks", tags=["Tasks"]) app.include_router(settings.router, prefix="/api/settings", tags=["Settings"]) app.include_router(connections.router, prefix="/api/settings/connections", tags=["Connections"]) app.include_router(environments.router, prefix="/api/environments", tags=["Environments"]) app.include_router(mappings.router) app.include_router(migration.router) app.include_router(git.router) app.include_router(llm.router) app.include_router(storage.router, prefix="/api/storage", tags=["Storage"]) # [DEF:websocket_endpoint:Function] # @PURPOSE: Provides a WebSocket endpoint for real-time log streaming of a task. # @PRE: task_id must be a valid task ID. # @POST: WebSocket connection is managed and logs are streamed until disconnect. @app.websocket("/ws/logs/{task_id}") async def websocket_endpoint(websocket: WebSocket, task_id: str): with belief_scope("websocket_endpoint", f"task_id={task_id}"): await websocket.accept() logger.info(f"WebSocket connection accepted for task {task_id}") task_manager = get_task_manager() queue = await task_manager.subscribe_logs(task_id) try: # Stream new logs logger.info(f"Starting log stream for task {task_id}") # Send initial logs first to build context initial_logs = task_manager.get_task_logs(task_id) for log_entry in initial_logs: log_dict = log_entry.dict() log_dict['timestamp'] = log_dict['timestamp'].isoformat() await websocket.send_json(log_dict) # Force a check for AWAITING_INPUT status immediately upon connection # This ensures that if the task is already waiting when the user connects, they get the prompt. task = task_manager.get_task(task_id) if task and task.status == "AWAITING_INPUT" and task.input_request: # Construct a synthetic log entry to trigger the frontend handler # This is a bit of a hack but avoids changing the websocket protocol significantly synthetic_log = { "timestamp": task.logs[-1].timestamp.isoformat() if task.logs else "2024-01-01T00:00:00", "level": "INFO", "message": "Task paused for user input (Connection Re-established)", "context": {"input_request": task.input_request} } await websocket.send_json(synthetic_log) while True: log_entry = await queue.get() log_dict = log_entry.dict() log_dict['timestamp'] = log_dict['timestamp'].isoformat() await websocket.send_json(log_dict) # If task is finished, we could potentially close the connection # but let's keep it open for a bit or until the client disconnects if "Task completed successfully" in log_entry.message or "Task failed" in log_entry.message: # Wait a bit to ensure client receives the last message await asyncio.sleep(2) # DO NOT BREAK here - allow client to keep connection open if they want to review logs # or until they disconnect. Breaking closes the socket immediately. # break except WebSocketDisconnect: logger.info(f"WebSocket connection disconnected for task {task_id}") except Exception as e: logger.error(f"WebSocket error for task {task_id}: {e}") finally: task_manager.unsubscribe_logs(task_id, queue) # [/DEF:websocket_endpoint:Function] # [DEF:StaticFiles:Mount] # @SEMANTICS: static, frontend, spa # @PURPOSE: Mounts the frontend build directory to serve static assets. frontend_path = project_root / "frontend" / "build" if frontend_path.exists(): app.mount("/_app", StaticFiles(directory=str(frontend_path / "_app")), name="static") # Serve other static files from the root of build directory # [DEF:serve_spa:Function] # @PURPOSE: Serves frontend static files or index.html for SPA routing. # @PRE: file_path is requested by the client. # @POST: Returns the requested file or index.html as a fallback. @app.get("/{file_path:path}") async def serve_spa(file_path: str): with belief_scope("serve_spa", f"path={file_path}"): # Don't serve SPA for API routes that fell through if file_path.startswith("api/"): logger.info(f"[DEBUG] API route fell through to serve_spa: {file_path}") raise HTTPException(status_code=404, detail=f"API endpoint not found: {file_path}") full_path = frontend_path / file_path if full_path.is_file(): return FileResponse(str(full_path)) # Fallback to index.html for SPA routing return FileResponse(str(frontend_path / "index.html")) # [/DEF:serve_spa:Function] else: # [DEF:read_root:Function] # @PURPOSE: A simple root endpoint to confirm that the API is running when frontend is missing. # @PRE: None. # @POST: Returns a JSON message indicating API status. @app.get("/") async def read_root(): with belief_scope("read_root"): return {"message": "Superset Tools API is running (Frontend build not found)"} # [/DEF:read_root:Function] # [/DEF:StaticFiles:Mount] # [/DEF:AppModule:Module]