# [DEF:AppModule:Module] # @SEMANTICS: app, main, entrypoint, fastapi # @PURPOSE: The main entry point for the FastAPI application. It initializes the app, configures CORS, sets up dependencies, includes API routers, and defines the WebSocket endpoint for log streaming. # @LAYER: UI (API) # @RELATION: Depends on the dependency module and API route modules. import sys from pathlib import Path # project_root is used for static files mounting project_root = Path(__file__).resolve().parent.parent.parent from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Depends, Request, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from fastapi.responses import FileResponse import asyncio import os from .dependencies import get_task_manager, get_scheduler_service from .core.logger import logger, belief_scope from .api.routes import plugins, tasks, settings, environments, mappings, migration, connections, git, storage from .core.database import init_db # [DEF:App:Global] # @SEMANTICS: app, fastapi, instance # @PURPOSE: The global FastAPI application instance. app = FastAPI( title="Superset Tools API", description="API for managing Superset automation tools and plugins.", version="1.0.0", ) # [/DEF:App:Global] # [DEF:startup_event:Function] # @PURPOSE: Handles application startup tasks, such as starting the scheduler. # @PRE: None. # @POST: Scheduler is started. # Startup event @app.on_event("startup") async def startup_event(): with belief_scope("startup_event"): scheduler = get_scheduler_service() scheduler.start() # [/DEF:startup_event:Function] # [DEF:shutdown_event:Function] # @PURPOSE: Handles application shutdown tasks, such as stopping the scheduler. # @PRE: None. # @POST: Scheduler is stopped. # Shutdown event @app.on_event("shutdown") async def shutdown_event(): with belief_scope("shutdown_event"): scheduler = get_scheduler_service() scheduler.stop() # [/DEF:shutdown_event:Function] # Configure CORS app.add_middleware( CORSMiddleware, allow_origins=["*"], # Adjust this in production allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # [DEF:log_requests:Function] # @PURPOSE: Middleware to log incoming HTTP requests and their response status. # @PRE: request is a FastAPI Request object. # @POST: Logs request and response details. # @PARAM: request (Request) - The incoming request object. # @PARAM: call_next (Callable) - The next middleware or route handler. @app.middleware("http") async def log_requests(request: Request, call_next): with belief_scope("log_requests", f"{request.method} {request.url.path}"): logger.info(f"[DEBUG] Incoming request: {request.method} {request.url.path}") response = await call_next(request) logger.info(f"[DEBUG] Response status: {response.status_code} for {request.url.path}") return response # [/DEF:log_requests:Function] # Include API routes app.include_router(plugins.router, prefix="/api/plugins", tags=["Plugins"]) app.include_router(tasks.router, prefix="/api/tasks", tags=["Tasks"]) app.include_router(settings.router, prefix="/api/settings", tags=["Settings"]) app.include_router(connections.router, prefix="/api/settings/connections", tags=["Connections"]) app.include_router(environments.router, prefix="/api/environments", tags=["Environments"]) app.include_router(mappings.router) app.include_router(migration.router) app.include_router(git.router) app.include_router(storage.router, prefix="/api/storage", tags=["Storage"]) # [DEF:websocket_endpoint:Function] # @PURPOSE: Provides a WebSocket endpoint for real-time log streaming of a task. # @PRE: task_id must be a valid task ID. # @POST: WebSocket connection is managed and logs are streamed until disconnect. @app.websocket("/ws/logs/{task_id}") async def websocket_endpoint(websocket: WebSocket, task_id: str): with belief_scope("websocket_endpoint", f"task_id={task_id}"): await websocket.accept() logger.info(f"WebSocket connection accepted for task {task_id}") task_manager = get_task_manager() queue = await task_manager.subscribe_logs(task_id) try: # Stream new logs logger.info(f"Starting log stream for task {task_id}") # Send initial logs first to build context initial_logs = task_manager.get_task_logs(task_id) for log_entry in initial_logs: log_dict = log_entry.dict() log_dict['timestamp'] = log_dict['timestamp'].isoformat() await websocket.send_json(log_dict) # Force a check for AWAITING_INPUT status immediately upon connection # This ensures that if the task is already waiting when the user connects, they get the prompt. task = task_manager.get_task(task_id) if task and task.status == "AWAITING_INPUT" and task.input_request: # Construct a synthetic log entry to trigger the frontend handler # This is a bit of a hack but avoids changing the websocket protocol significantly synthetic_log = { "timestamp": task.logs[-1].timestamp.isoformat() if task.logs else "2024-01-01T00:00:00", "level": "INFO", "message": "Task paused for user input (Connection Re-established)", "context": {"input_request": task.input_request} } await websocket.send_json(synthetic_log) while True: log_entry = await queue.get() log_dict = log_entry.dict() log_dict['timestamp'] = log_dict['timestamp'].isoformat() await websocket.send_json(log_dict) # If task is finished, we could potentially close the connection # but let's keep it open for a bit or until the client disconnects if "Task completed successfully" in log_entry.message or "Task failed" in log_entry.message: # Wait a bit to ensure client receives the last message await asyncio.sleep(2) # DO NOT BREAK here - allow client to keep connection open if they want to review logs # or until they disconnect. Breaking closes the socket immediately. # break except WebSocketDisconnect: logger.info(f"WebSocket connection disconnected for task {task_id}") except Exception as e: logger.error(f"WebSocket error for task {task_id}: {e}") finally: task_manager.unsubscribe_logs(task_id, queue) # [/DEF:websocket_endpoint:Function] # [DEF:StaticFiles:Mount] # @SEMANTICS: static, frontend, spa # @PURPOSE: Mounts the frontend build directory to serve static assets. frontend_path = project_root / "frontend" / "build" if frontend_path.exists(): app.mount("/_app", StaticFiles(directory=str(frontend_path / "_app")), name="static") # Serve other static files from the root of build directory # [DEF:serve_spa:Function] # @PURPOSE: Serves frontend static files or index.html for SPA routing. # @PRE: file_path is requested by the client. # @POST: Returns the requested file or index.html as a fallback. @app.get("/{file_path:path}") async def serve_spa(file_path: str): with belief_scope("serve_spa", f"path={file_path}"): # Don't serve SPA for API routes that fell through if file_path.startswith("api/"): logger.info(f"[DEBUG] API route fell through to serve_spa: {file_path}") raise HTTPException(status_code=404, detail=f"API endpoint not found: {file_path}") full_path = frontend_path / file_path if full_path.is_file(): return FileResponse(str(full_path)) # Fallback to index.html for SPA routing return FileResponse(str(frontend_path / "index.html")) # [/DEF:serve_spa:Function] else: # [DEF:read_root:Function] # @PURPOSE: A simple root endpoint to confirm that the API is running when frontend is missing. # @PRE: None. # @POST: Returns a JSON message indicating API status. @app.get("/") async def read_root(): with belief_scope("read_root"): return {"message": "Superset Tools API is running (Frontend build not found)"} # [/DEF:read_root:Function] # [/DEF:StaticFiles:Mount] # [/DEF:AppModule:Module]