Files
ss-tools/backend/tests/test_auth.py
2026-02-10 12:53:01 +03:00

162 lines
5.1 KiB
Python

import sys
from pathlib import Path
# Add src to path
sys.path.append(str(Path(__file__).parent.parent / "src"))
import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from src.core.database import Base
from src.models.auth import User, Role, Permission, ADGroupMapping
from src.services.auth_service import AuthService
from src.core.auth.repository import AuthRepository
from src.core.auth.security import verify_password, get_password_hash
# Create in-memory SQLite database for testing
SQLALCHEMY_DATABASE_URL = "sqlite:///:memory:"
engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False})
TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# Create all tables
Base.metadata.create_all(bind=engine)
@pytest.fixture
def db_session():
"""Create a new database session with a transaction, rollback after test"""
connection = engine.connect()
transaction = connection.begin()
session = TestingSessionLocal(bind=connection)
yield session
session.close()
transaction.rollback()
connection.close()
@pytest.fixture
def auth_service(db_session):
return AuthService(db_session)
@pytest.fixture
def auth_repo(db_session):
return AuthRepository(db_session)
def test_create_user(auth_repo):
"""Test user creation"""
user = User(
username="testuser",
email="test@example.com",
password_hash=get_password_hash("testpassword123"),
auth_source="LOCAL"
)
auth_repo.db.add(user)
auth_repo.db.commit()
retrieved_user = auth_repo.get_user_by_username("testuser")
assert retrieved_user is not None
assert retrieved_user.username == "testuser"
assert retrieved_user.email == "test@example.com"
assert verify_password("testpassword123", retrieved_user.password_hash)
def test_authenticate_user(auth_service, auth_repo):
"""Test user authentication with valid and invalid credentials"""
user = User(
username="testuser",
email="test@example.com",
password_hash=get_password_hash("testpassword123"),
auth_source="LOCAL"
)
auth_repo.db.add(user)
auth_repo.db.commit()
# Test valid credentials
authenticated_user = auth_service.authenticate_user("testuser", "testpassword123")
assert authenticated_user is not None
assert authenticated_user.username == "testuser"
# Test invalid password
invalid_user = auth_service.authenticate_user("testuser", "wrongpassword")
assert invalid_user is None
# Test invalid username
invalid_user = auth_service.authenticate_user("nonexistent", "testpassword123")
assert invalid_user is None
def test_create_session(auth_service, auth_repo):
"""Test session token creation"""
user = User(
username="testuser",
email="test@example.com",
password_hash=get_password_hash("testpassword123"),
auth_source="LOCAL"
)
auth_repo.db.add(user)
auth_repo.db.commit()
session = auth_service.create_session(user)
assert "access_token" in session
assert "token_type" in session
assert session["token_type"] == "bearer"
assert len(session["access_token"]) > 0
def test_role_permission_association(auth_repo):
"""Test role and permission association"""
role = Role(name="Admin", description="System administrator")
perm1 = Permission(resource="admin:users", action="READ")
perm2 = Permission(resource="admin:users", action="WRITE")
role.permissions.extend([perm1, perm2])
auth_repo.db.add(role)
auth_repo.db.commit()
retrieved_role = auth_repo.get_role_by_name("Admin")
assert retrieved_role is not None
assert len(retrieved_role.permissions) == 2
permissions = [f"{p.resource}:{p.action}" for p in retrieved_role.permissions]
assert "admin:users:READ" in permissions
assert "admin:users:WRITE" in permissions
def test_user_role_association(auth_repo):
"""Test user and role association"""
role = Role(name="Admin", description="System administrator")
user = User(
username="adminuser",
email="admin@example.com",
password_hash=get_password_hash("adminpass123"),
auth_source="LOCAL"
)
user.roles.append(role)
auth_repo.db.add(role)
auth_repo.db.add(user)
auth_repo.db.commit()
retrieved_user = auth_repo.get_user_by_username("adminuser")
assert retrieved_user is not None
assert len(retrieved_user.roles) == 1
assert retrieved_user.roles[0].name == "Admin"
def test_ad_group_mapping(auth_repo):
"""Test AD group mapping"""
role = Role(name="ADFS_Admin", description="ADFS administrators")
auth_repo.db.add(role)
auth_repo.db.commit()
mapping = ADGroupMapping(ad_group="DOMAIN\\ADFS_Admins", role_id=role.id)
auth_repo.db.add(mapping)
auth_repo.db.commit()
retrieved_mapping = auth_repo.db.query(ADGroupMapping).filter_by(ad_group="DOMAIN\\ADFS_Admins").first()
assert retrieved_mapping is not None
assert retrieved_mapping.role_id == role.id