import sys from pathlib import Path # Add src to path sys.path.append(str(Path(__file__).parent.parent / "src")) import pytest from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from src.core.database import Base from src.models.auth import User, Role, Permission, ADGroupMapping from src.services.auth_service import AuthService from src.core.auth.repository import AuthRepository from src.core.auth.security import verify_password, get_password_hash # Create in-memory SQLite database for testing SQLALCHEMY_DATABASE_URL = "sqlite:///:memory:" engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}) TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) # Create all tables Base.metadata.create_all(bind=engine) @pytest.fixture def db_session(): """Create a new database session with a transaction, rollback after test""" connection = engine.connect() transaction = connection.begin() session = TestingSessionLocal(bind=connection) yield session session.close() transaction.rollback() connection.close() @pytest.fixture def auth_service(db_session): return AuthService(db_session) @pytest.fixture def auth_repo(db_session): return AuthRepository(db_session) def test_create_user(auth_repo): """Test user creation""" user = User( username="testuser", email="test@example.com", password_hash=get_password_hash("testpassword123"), auth_source="LOCAL" ) auth_repo.db.add(user) auth_repo.db.commit() retrieved_user = auth_repo.get_user_by_username("testuser") assert retrieved_user is not None assert retrieved_user.username == "testuser" assert retrieved_user.email == "test@example.com" assert verify_password("testpassword123", retrieved_user.password_hash) def test_authenticate_user(auth_service, auth_repo): """Test user authentication with valid and invalid credentials""" user = User( username="testuser", email="test@example.com", password_hash=get_password_hash("testpassword123"), auth_source="LOCAL" ) auth_repo.db.add(user) auth_repo.db.commit() # Test valid credentials authenticated_user = auth_service.authenticate_user("testuser", "testpassword123") assert authenticated_user is not None assert authenticated_user.username == "testuser" # Test invalid password invalid_user = auth_service.authenticate_user("testuser", "wrongpassword") assert invalid_user is None # Test invalid username invalid_user = auth_service.authenticate_user("nonexistent", "testpassword123") assert invalid_user is None def test_create_session(auth_service, auth_repo): """Test session token creation""" user = User( username="testuser", email="test@example.com", password_hash=get_password_hash("testpassword123"), auth_source="LOCAL" ) auth_repo.db.add(user) auth_repo.db.commit() session = auth_service.create_session(user) assert "access_token" in session assert "token_type" in session assert session["token_type"] == "bearer" assert len(session["access_token"]) > 0 def test_role_permission_association(auth_repo): """Test role and permission association""" role = Role(name="Admin", description="System administrator") perm1 = Permission(resource="admin:users", action="READ") perm2 = Permission(resource="admin:users", action="WRITE") role.permissions.extend([perm1, perm2]) auth_repo.db.add(role) auth_repo.db.commit() retrieved_role = auth_repo.get_role_by_name("Admin") assert retrieved_role is not None assert len(retrieved_role.permissions) == 2 permissions = [f"{p.resource}:{p.action}" for p in retrieved_role.permissions] assert "admin:users:READ" in permissions assert "admin:users:WRITE" in permissions def test_user_role_association(auth_repo): """Test user and role association""" role = Role(name="Admin", description="System administrator") user = User( username="adminuser", email="admin@example.com", password_hash=get_password_hash("adminpass123"), auth_source="LOCAL" ) user.roles.append(role) auth_repo.db.add(role) auth_repo.db.add(user) auth_repo.db.commit() retrieved_user = auth_repo.get_user_by_username("adminuser") assert retrieved_user is not None assert len(retrieved_user.roles) == 1 assert retrieved_user.roles[0].name == "Admin" def test_ad_group_mapping(auth_repo): """Test AD group mapping""" role = Role(name="ADFS_Admin", description="ADFS administrators") auth_repo.db.add(role) auth_repo.db.commit() mapping = ADGroupMapping(ad_group="DOMAIN\\ADFS_Admins", role_id=role.id) auth_repo.db.add(mapping) auth_repo.db.commit() retrieved_mapping = auth_repo.db.query(ADGroupMapping).filter_by(ad_group="DOMAIN\\ADFS_Admins").first() assert retrieved_mapping is not None assert retrieved_mapping.role_id == role.id