# [DEF:backend.src.services.auth_service:Module] # # @SEMANTICS: auth, service, business-logic, login, jwt # @PURPOSE: Orchestrates authentication business logic. # @LAYER: Service # @RELATION: USES -> backend.src.core.auth.repository.AuthRepository # @RELATION: USES -> backend.src.core.auth.security # @RELATION: USES -> backend.src.core.auth.jwt # # @INVARIANT: Authentication must verify both credentials and account status. # [SECTION: IMPORTS] from typing import Dict, Any from sqlalchemy.orm import Session from ..models.auth import User, Role from ..core.auth.repository import AuthRepository from ..core.auth.security import verify_password from ..core.auth.jwt import create_access_token from ..core.logger import belief_scope # [/SECTION] # [DEF:AuthService:Class] # @PURPOSE: Provides high-level authentication services. class AuthService: # [DEF:__init__:Function] # @PURPOSE: Initializes the service with a database session. # @PARAM: db (Session) - SQLAlchemy session. def __init__(self, db: Session): self.repo = AuthRepository(db) # [/DEF:__init__:Function] # [DEF:authenticate_user:Function] # @PURPOSE: Authenticates a user with username and password. # @PRE: username and password are provided. # @POST: Returns User object if authentication succeeds, else None. # @SIDE_EFFECT: Updates last_login timestamp on success. # @PARAM: username (str) - The username. # @PARAM: password (str) - The plain password. # @RETURN: Optional[User] - The authenticated user or None. def authenticate_user(self, username: str, password: str): with belief_scope("AuthService.authenticate_user"): user = self.repo.get_user_by_username(username) if not user: return None if not user.is_active: return None if not user.password_hash or not verify_password(password, user.password_hash): return None self.repo.update_last_login(user) return user # [/DEF:authenticate_user:Function] # [DEF:create_session:Function] # @PURPOSE: Creates a JWT session for an authenticated user. # @PRE: user is a valid User object. # @POST: Returns a dictionary with access_token and token_type. # @PARAM: user (User) - The authenticated user. # @RETURN: Dict[str, str] - Session data. def create_session(self, user) -> Dict[str, str]: with belief_scope("AuthService.create_session"): # Collect role names for scopes scopes = [role.name for role in user.roles] token_data = { "sub": user.username, "scopes": scopes } access_token = create_access_token(data=token_data) return { "access_token": access_token, "token_type": "bearer" } # [/DEF:create_session:Function] # [DEF:provision_adfs_user:Function] # @PURPOSE: Just-In-Time (JIT) provisioning for ADFS users based on group mappings. # @PRE: user_info contains 'upn' (username), 'email', and 'groups'. # @POST: User is created/updated and assigned roles based on groups. # @PARAM: user_info (Dict[str, Any]) - Claims from ADFS token. # @RETURN: User - The provisioned user. def provision_adfs_user(self, user_info: Dict[str, Any]) -> User: with belief_scope("AuthService.provision_adfs_user"): username = user_info.get("upn") or user_info.get("email") email = user_info.get("email") ad_groups = user_info.get("groups", []) user = self.repo.get_user_by_username(username) if not user: user = User( username=username, email=email, auth_source="ADFS", is_active=True ) self.repo.db.add(user) # Update roles based on group mappings from ..models.auth import ADGroupMapping mapped_roles = self.repo.db.query(Role).join(ADGroupMapping).filter( ADGroupMapping.ad_group.in_(ad_groups) ).all() user.roles = mapped_roles self.repo.db.commit() self.repo.db.refresh(user) return user # [/DEF:provision_adfs_user:Function] # [/DEF:AuthService:Class] # [/DEF:backend.src.services.auth_service:Module]