# [DEF:backend.src.api.routes.admin:Module] # # @TIER: STANDARD # @SEMANTICS: api, admin, users, roles, permissions # @PURPOSE: Admin API endpoints for user and role management. # @LAYER: API # @RELATION: USES -> backend.src.core.auth.repository.AuthRepository # @RELATION: USES -> backend.src.dependencies.has_permission # # @INVARIANT: All endpoints in this module require 'Admin' role or 'admin' scope. # [SECTION: IMPORTS] from typing import List from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy.orm import Session from ...core.database import get_auth_db from ...core.auth.repository import AuthRepository from ...core.auth.security import get_password_hash from ...schemas.auth import ( User as UserSchema, UserCreate, UserUpdate, RoleSchema, RoleCreate, RoleUpdate, PermissionSchema, ADGroupMappingSchema, ADGroupMappingCreate ) from ...models.auth import User, Role, ADGroupMapping from ...dependencies import has_permission from ...core.logger import logger, belief_scope # [/SECTION] # [DEF:router:Variable] # @PURPOSE: APIRouter instance for admin routes. router = APIRouter(prefix="/api/admin", tags=["admin"]) # [/DEF:router:Variable] # [DEF:list_users:Function] # @PURPOSE: Lists all registered users. # @PRE: Current user has 'Admin' role. # @POST: Returns a list of UserSchema objects. # @PARAM: db (Session) - Auth database session. # @RETURN: List[UserSchema] - List of users. @router.get("/users", response_model=List[UserSchema]) async def list_users( db: Session = Depends(get_auth_db), _ = Depends(has_permission("admin:users", "READ")) ): with belief_scope("api.admin.list_users"): users = db.query(User).all() return users # [/DEF:list_users:Function] # [DEF:create_user:Function] # @PURPOSE: Creates a new local user. # @PRE: Current user has 'Admin' role. # @POST: New user is created in the database. # @PARAM: user_in (UserCreate) - New user data. # @PARAM: db (Session) - Auth database session. # @RETURN: UserSchema - The created user. @router.post("/users", response_model=UserSchema, status_code=status.HTTP_201_CREATED) async def create_user( user_in: UserCreate, db: Session = Depends(get_auth_db), _ = Depends(has_permission("admin:users", "WRITE")) ): with belief_scope("api.admin.create_user"): repo = AuthRepository(db) if repo.get_user_by_username(user_in.username): raise HTTPException(status_code=400, detail="Username already exists") new_user = User( username=user_in.username, email=user_in.email, password_hash=get_password_hash(user_in.password), auth_source="LOCAL", is_active=user_in.is_active ) for role_name in user_in.roles: role = repo.get_role_by_name(role_name) if role: new_user.roles.append(role) db.add(new_user) db.commit() db.refresh(new_user) return new_user # [/DEF:create_user:Function] # [DEF:update_user:Function] # @PURPOSE: Updates an existing user. @router.put("/users/{user_id}", response_model=UserSchema) async def update_user( user_id: str, user_in: UserUpdate, db: Session = Depends(get_auth_db), _ = Depends(has_permission("admin:users", "WRITE")) ): with belief_scope("api.admin.update_user"): repo = AuthRepository(db) user = repo.get_user_by_id(user_id) if not user: raise HTTPException(status_code=404, detail="User not found") if user_in.email is not None: user.email = user_in.email if user_in.is_active is not None: user.is_active = user_in.is_active if user_in.password is not None: user.password_hash = get_password_hash(user_in.password) if user_in.roles is not None: user.roles = [] for role_name in user_in.roles: role = repo.get_role_by_name(role_name) if role: user.roles.append(role) db.commit() db.refresh(user) return user # [/DEF:update_user:Function] # [DEF:delete_user:Function] # @PURPOSE: Deletes a user. @router.delete("/users/{user_id}", status_code=status.HTTP_204_NO_CONTENT) async def delete_user( user_id: str, db: Session = Depends(get_auth_db), _ = Depends(has_permission("admin:users", "WRITE")) ): with belief_scope("api.admin.delete_user"): logger.info(f"[DEBUG] Attempting to delete user context={{'user_id': '{user_id}'}}") repo = AuthRepository(db) user = repo.get_user_by_id(user_id) if not user: logger.warning(f"[DEBUG] User not found for deletion context={{'user_id': '{user_id}'}}") raise HTTPException(status_code=404, detail="User not found") logger.info(f"[DEBUG] Found user to delete context={{'username': '{user.username}'}}") db.delete(user) db.commit() logger.info(f"[DEBUG] Successfully deleted user context={{'user_id': '{user_id}'}}") return None # [/DEF:delete_user:Function] # [DEF:list_roles:Function] # @PURPOSE: Lists all available roles. # @RETURN: List[RoleSchema] - List of roles. # @RELATION: CALLS -> backend.src.models.auth.Role @router.get("/roles", response_model=List[RoleSchema]) async def list_roles( db: Session = Depends(get_auth_db), _ = Depends(has_permission("admin:roles", "READ")) ): with belief_scope("api.admin.list_roles"): return db.query(Role).all() # [/DEF:list_roles:Function] # [DEF:create_role:Function] # @PURPOSE: Creates a new system role with associated permissions. # @PRE: Role name must be unique. # @POST: New Role record is created in auth.db. # @PARAM: role_in (RoleCreate) - New role data. # @PARAM: db (Session) - Auth database session. # @RETURN: RoleSchema - The created role. # @SIDE_EFFECT: Commits new role and associations to auth.db. # @RELATION: CALLS -> backend.src.core.auth.repository.AuthRepository.get_permission_by_id @router.post("/roles", response_model=RoleSchema, status_code=status.HTTP_201_CREATED) async def create_role( role_in: RoleCreate, db: Session = Depends(get_auth_db), _ = Depends(has_permission("admin:roles", "WRITE")) ): with belief_scope("api.admin.create_role"): if db.query(Role).filter(Role.name == role_in.name).first(): raise HTTPException(status_code=400, detail="Role already exists") new_role = Role(name=role_in.name, description=role_in.description) repo = AuthRepository(db) for perm_id_or_str in role_in.permissions: perm = repo.get_permission_by_id(perm_id_or_str) if not perm and ":" in perm_id_or_str: res, act = perm_id_or_str.split(":", 1) perm = repo.get_permission_by_resource_action(res, act) if perm: new_role.permissions.append(perm) db.add(new_role) db.commit() db.refresh(new_role) return new_role # [/DEF:create_role:Function] # [DEF:update_role:Function] # @PURPOSE: Updates an existing role's metadata and permissions. # @PRE: role_id must be a valid existing role UUID. # @POST: Role record is updated in auth.db. # @PARAM: role_id (str) - Target role identifier. # @PARAM: role_in (RoleUpdate) - Updated role data. # @PARAM: db (Session) - Auth database session. # @RETURN: RoleSchema - The updated role. # @SIDE_EFFECT: Commits updates to auth.db. # @RELATION: CALLS -> backend.src.core.auth.repository.AuthRepository.get_role_by_id @router.put("/roles/{role_id}", response_model=RoleSchema) async def update_role( role_id: str, role_in: RoleUpdate, db: Session = Depends(get_auth_db), _ = Depends(has_permission("admin:roles", "WRITE")) ): with belief_scope("api.admin.update_role"): repo = AuthRepository(db) role = repo.get_role_by_id(role_id) if not role: raise HTTPException(status_code=404, detail="Role not found") if role_in.name is not None: role.name = role_in.name if role_in.description is not None: role.description = role_in.description if role_in.permissions is not None: role.permissions = [] for perm_id_or_str in role_in.permissions: perm = repo.get_permission_by_id(perm_id_or_str) if not perm and ":" in perm_id_or_str: res, act = perm_id_or_str.split(":", 1) perm = repo.get_permission_by_resource_action(res, act) if perm: role.permissions.append(perm) db.commit() db.refresh(role) return role # [/DEF:update_role:Function] # [DEF:delete_role:Function] # @PURPOSE: Removes a role from the system. # @PRE: role_id must be a valid existing role UUID. # @POST: Role record is removed from auth.db. # @PARAM: role_id (str) - Target role identifier. # @PARAM: db (Session) - Auth database session. # @RETURN: None # @SIDE_EFFECT: Deletes record from auth.db and commits. # @RELATION: CALLS -> backend.src.core.auth.repository.AuthRepository.get_role_by_id @router.delete("/roles/{role_id}", status_code=status.HTTP_204_NO_CONTENT) async def delete_role( role_id: str, db: Session = Depends(get_auth_db), _ = Depends(has_permission("admin:roles", "WRITE")) ): with belief_scope("api.admin.delete_role"): repo = AuthRepository(db) role = repo.get_role_by_id(role_id) if not role: raise HTTPException(status_code=404, detail="Role not found") db.delete(role) db.commit() return None # [/DEF:delete_role:Function] # [DEF:list_permissions:Function] # @PURPOSE: Lists all available system permissions for assignment. # @POST: Returns a list of all PermissionSchema objects. # @PARAM: db (Session) - Auth database session. # @RETURN: List[PermissionSchema] - List of permissions. # @RELATION: CALLS -> backend.src.core.auth.repository.AuthRepository.list_permissions @router.get("/permissions", response_model=List[PermissionSchema]) async def list_permissions( db: Session = Depends(get_auth_db), _ = Depends(has_permission("admin:roles", "READ")) ): with belief_scope("api.admin.list_permissions"): repo = AuthRepository(db) return repo.list_permissions() # [/DEF:list_permissions:Function] # [DEF:list_ad_mappings:Function] # @PURPOSE: Lists all AD Group to Role mappings. @router.get("/ad-mappings", response_model=List[ADGroupMappingSchema]) async def list_ad_mappings( db: Session = Depends(get_auth_db), _ = Depends(has_permission("admin:settings", "READ")) ): with belief_scope("api.admin.list_ad_mappings"): return db.query(ADGroupMapping).all() # [/DEF:list_ad_mappings:Function] # [DEF:create_ad_mapping:Function] # @PURPOSE: Creates a new AD Group mapping. @router.post("/ad-mappings", response_model=ADGroupMappingSchema) async def create_ad_mapping( mapping_in: ADGroupMappingCreate, db: Session = Depends(get_auth_db), _ = Depends(has_permission("admin:settings", "WRITE")) ): with belief_scope("api.admin.create_ad_mapping"): new_mapping = ADGroupMapping( ad_group=mapping_in.ad_group, role_id=mapping_in.role_id ) db.add(new_mapping) db.commit() db.refresh(new_mapping) return new_mapping # [/DEF:create_ad_mapping:Function] # [/DEF:backend.src.api.routes.admin:Module]