115 lines
4.5 KiB
Python
115 lines
4.5 KiB
Python
# [DEF:backend.src.services.auth_service:Module]
|
|
#
|
|
# @SEMANTICS: auth, service, business-logic, login, jwt
|
|
# @PURPOSE: Orchestrates authentication business logic.
|
|
# @LAYER: Service
|
|
# @RELATION: USES -> backend.src.core.auth.repository.AuthRepository
|
|
# @RELATION: USES -> backend.src.core.auth.security
|
|
# @RELATION: USES -> backend.src.core.auth.jwt
|
|
#
|
|
# @INVARIANT: Authentication must verify both credentials and account status.
|
|
|
|
# [SECTION: IMPORTS]
|
|
from typing import Dict, Any
|
|
from sqlalchemy.orm import Session
|
|
from ..models.auth import User, Role
|
|
from ..core.auth.repository import AuthRepository
|
|
from ..core.auth.security import verify_password
|
|
from ..core.auth.jwt import create_access_token
|
|
from ..core.logger import belief_scope
|
|
# [/SECTION]
|
|
|
|
# [DEF:AuthService:Class]
|
|
# @PURPOSE: Provides high-level authentication services.
|
|
class AuthService:
|
|
# [DEF:__init__:Function]
|
|
# @PURPOSE: Initializes the service with a database session.
|
|
# @PARAM: db (Session) - SQLAlchemy session.
|
|
def __init__(self, db: Session):
|
|
self.repo = AuthRepository(db)
|
|
# [/DEF:__init__:Function]
|
|
|
|
# [DEF:authenticate_user:Function]
|
|
# @PURPOSE: Authenticates a user with username and password.
|
|
# @PRE: username and password are provided.
|
|
# @POST: Returns User object if authentication succeeds, else None.
|
|
# @SIDE_EFFECT: Updates last_login timestamp on success.
|
|
# @PARAM: username (str) - The username.
|
|
# @PARAM: password (str) - The plain password.
|
|
# @RETURN: Optional[User] - The authenticated user or None.
|
|
def authenticate_user(self, username: str, password: str):
|
|
with belief_scope("AuthService.authenticate_user"):
|
|
user = self.repo.get_user_by_username(username)
|
|
if not user:
|
|
return None
|
|
|
|
if not user.is_active:
|
|
return None
|
|
|
|
if not user.password_hash or not verify_password(password, user.password_hash):
|
|
return None
|
|
|
|
self.repo.update_last_login(user)
|
|
return user
|
|
# [/DEF:authenticate_user:Function]
|
|
|
|
# [DEF:create_session:Function]
|
|
# @PURPOSE: Creates a JWT session for an authenticated user.
|
|
# @PRE: user is a valid User object.
|
|
# @POST: Returns a dictionary with access_token and token_type.
|
|
# @PARAM: user (User) - The authenticated user.
|
|
# @RETURN: Dict[str, str] - Session data.
|
|
def create_session(self, user) -> Dict[str, str]:
|
|
with belief_scope("AuthService.create_session"):
|
|
# Collect role names for scopes
|
|
scopes = [role.name for role in user.roles]
|
|
|
|
token_data = {
|
|
"sub": user.username,
|
|
"scopes": scopes
|
|
}
|
|
|
|
access_token = create_access_token(data=token_data)
|
|
return {
|
|
"access_token": access_token,
|
|
"token_type": "bearer"
|
|
}
|
|
# [/DEF:create_session:Function]
|
|
|
|
# [DEF:provision_adfs_user:Function]
|
|
# @PURPOSE: Just-In-Time (JIT) provisioning for ADFS users based on group mappings.
|
|
# @PRE: user_info contains 'upn' (username), 'email', and 'groups'.
|
|
# @POST: User is created/updated and assigned roles based on groups.
|
|
# @PARAM: user_info (Dict[str, Any]) - Claims from ADFS token.
|
|
# @RETURN: User - The provisioned user.
|
|
def provision_adfs_user(self, user_info: Dict[str, Any]) -> User:
|
|
with belief_scope("AuthService.provision_adfs_user"):
|
|
username = user_info.get("upn") or user_info.get("email")
|
|
email = user_info.get("email")
|
|
ad_groups = user_info.get("groups", [])
|
|
|
|
user = self.repo.get_user_by_username(username)
|
|
if not user:
|
|
user = User(
|
|
username=username,
|
|
email=email,
|
|
auth_source="ADFS",
|
|
is_active=True
|
|
)
|
|
self.repo.db.add(user)
|
|
|
|
# Update roles based on group mappings
|
|
from ..models.auth import ADGroupMapping
|
|
mapped_roles = self.repo.db.query(Role).join(ADGroupMapping).filter(
|
|
ADGroupMapping.ad_group.in_(ad_groups)
|
|
).all()
|
|
|
|
user.roles = mapped_roles
|
|
self.repo.db.commit()
|
|
self.repo.db.refresh(user)
|
|
return user
|
|
# [/DEF:provision_adfs_user:Function]
|
|
|
|
# [/DEF:AuthService:Class]
|
|
|
|
# [/DEF:backend.src.services.auth_service:Module] |