# [DEF:backend.src.api.auth:Module] # # @SEMANTICS: api, auth, routes, login, logout # @PURPOSE: Authentication API endpoints. # @LAYER: API # @RELATION: USES -> backend.src.services.auth_service.AuthService # @RELATION: USES -> backend.src.core.database.get_auth_db # # @INVARIANT: All auth endpoints must return consistent error codes. # [SECTION: IMPORTS] from fastapi import APIRouter, Depends, HTTPException, status from fastapi.security import OAuth2PasswordRequestForm from sqlalchemy.orm import Session from ..core.database import get_auth_db from ..services.auth_service import AuthService from ..schemas.auth import Token, User as UserSchema from ..dependencies import get_current_user from ..core.auth.oauth import oauth, is_adfs_configured from ..core.auth.logger import log_security_event from ..core.logger import belief_scope import starlette.requests # [/SECTION] # [DEF:router:Variable] # @PURPOSE: APIRouter instance for authentication routes. router = APIRouter(prefix="/api/auth", tags=["auth"]) # [/DEF:router:Variable] # [DEF:login_for_access_token:Function] # @PURPOSE: Authenticates a user and returns a JWT access token. # @PRE: form_data contains username and password. # @POST: Returns a Token object on success. # @THROW: HTTPException 401 if authentication fails. # @PARAM: form_data (OAuth2PasswordRequestForm) - Login credentials. # @PARAM: db (Session) - Auth database session. # @RETURN: Token - The generated JWT token. @router.post("/login", response_model=Token) async def login_for_access_token( form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_auth_db) ): with belief_scope("api.auth.login"): auth_service = AuthService(db) user = auth_service.authenticate_user(form_data.username, form_data.password) if not user: log_security_event("LOGIN_FAILED", form_data.username, {"reason": "Invalid credentials"}) raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) log_security_event("LOGIN_SUCCESS", user.username, {"source": "LOCAL"}) return auth_service.create_session(user) # [/DEF:login_for_access_token:Function] # [DEF:read_users_me:Function] # @PURPOSE: Retrieves the profile of the currently authenticated user. # @PRE: Valid JWT token provided. # @POST: Returns the current user's data. # @PARAM: current_user (UserSchema) - The user extracted from the token. # @RETURN: UserSchema - The current user profile. @router.get("/me", response_model=UserSchema) async def read_users_me(current_user: UserSchema = Depends(get_current_user)): with belief_scope("api.auth.me"): return current_user # [/DEF:read_users_me:Function] # [DEF:logout:Function] # @PURPOSE: Logs out the current user (placeholder for session revocation). # @PRE: Valid JWT token provided. # @POST: Returns success message. @router.post("/logout") async def logout(current_user: UserSchema = Depends(get_current_user)): with belief_scope("api.auth.logout"): log_security_event("LOGOUT", current_user.username) # In a stateless JWT setup, client-side token deletion is primary. # Server-side revocation (blacklisting) can be added here if needed. return {"message": "Successfully logged out"} # [/DEF:logout:Function] # [DEF:login_adfs:Function] # @PURPOSE: Initiates the ADFS OIDC login flow. # @POST: Redirects the user to ADFS. @router.get("/login/adfs") async def login_adfs(request: starlette.requests.Request): with belief_scope("api.auth.login_adfs"): if not is_adfs_configured(): raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="ADFS is not configured. Please set ADFS_CLIENT_ID, ADFS_CLIENT_SECRET, and ADFS_METADATA_URL environment variables." ) redirect_uri = request.url_for('auth_callback_adfs') return await oauth.adfs.authorize_redirect(request, str(redirect_uri)) # [/DEF:login_adfs:Function] # [DEF:auth_callback_adfs:Function] # @PURPOSE: Handles the callback from ADFS after successful authentication. # @POST: Provisions user JIT and returns session token. @router.get("/callback/adfs", name="auth_callback_adfs") async def auth_callback_adfs(request: starlette.requests.Request, db: Session = Depends(get_auth_db)): with belief_scope("api.auth.callback_adfs"): if not is_adfs_configured(): raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="ADFS is not configured. Please set ADFS_CLIENT_ID, ADFS_CLIENT_SECRET, and ADFS_METADATA_URL environment variables." ) token = await oauth.adfs.authorize_access_token(request) user_info = token.get('userinfo') if not user_info: raise HTTPException(status_code=400, detail="Failed to retrieve user info from ADFS") auth_service = AuthService(db) user = auth_service.provision_adfs_user(user_info) return auth_service.create_session(user) # [/DEF:auth_callback_adfs:Function] # [/DEF:backend.src.api.auth:Module]