Передаем на тест

This commit is contained in:
2026-01-27 16:32:08 +03:00
parent cc244c2d86
commit d3c3a80ed2
42 changed files with 2836 additions and 140 deletions

View File

@@ -0,0 +1,45 @@
# [DEF:backend.src.core.auth.config:Module]
#
# @SEMANTICS: auth, config, settings, jwt, adfs
# @PURPOSE: Centralized configuration for authentication and authorization.
# @LAYER: Core
# @RELATION: DEPENDS_ON -> pydantic
#
# @INVARIANT: All sensitive configuration must have defaults or be loaded from environment.
# [SECTION: IMPORTS]
from pydantic import Field
from pydantic_settings import BaseSettings
import os
# [/SECTION]
# [DEF:AuthConfig:Class]
# @PURPOSE: Holds authentication-related settings.
# @PRE: Environment variables may be provided via .env file.
# @POST: Returns a configuration object with validated settings.
class AuthConfig(BaseSettings):
# JWT Settings
SECRET_KEY: str = Field(default="super-secret-key-change-in-production", env="AUTH_SECRET_KEY")
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
REFRESH_TOKEN_EXPIRE_DAYS: int = 7
# Database Settings
AUTH_DATABASE_URL: str = Field(default="sqlite:///./backend/auth.db", env="AUTH_DATABASE_URL")
# ADFS Settings
ADFS_CLIENT_ID: str = Field(default="", env="ADFS_CLIENT_ID")
ADFS_CLIENT_SECRET: str = Field(default="", env="ADFS_CLIENT_SECRET")
ADFS_METADATA_URL: str = Field(default="", env="ADFS_METADATA_URL")
class Config:
env_file = ".env"
extra = "ignore"
# [/DEF:AuthConfig:Class]
# [DEF:auth_config:Variable]
# @PURPOSE: Singleton instance of AuthConfig.
auth_config = AuthConfig()
# [/DEF:auth_config:Variable]
# [/DEF:backend.src.core.auth.config:Module]

View File

@@ -0,0 +1,54 @@
# [DEF:backend.src.core.auth.jwt:Module]
#
# @SEMANTICS: jwt, token, session, auth
# @PURPOSE: JWT token generation and validation logic.
# @LAYER: Core
# @RELATION: DEPENDS_ON -> jose
# @RELATION: USES -> backend.src.core.auth.config.auth_config
#
# @INVARIANT: Tokens must include expiration time and user identifier.
# [SECTION: IMPORTS]
from datetime import datetime, timedelta
from typing import Optional, List
from jose import JWTError, jwt
from .config import auth_config
from ..logger import belief_scope
# [/SECTION]
# [DEF:create_access_token:Function]
# @PURPOSE: Generates a new JWT access token.
# @PRE: data dict contains 'sub' (user_id) and optional 'scopes' (roles).
# @POST: Returns a signed JWT string.
#
# @PARAM: data (dict) - Payload data for the token.
# @PARAM: expires_delta (Optional[timedelta]) - Custom expiration time.
# @RETURN: str - The encoded JWT.
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
with belief_scope("create_access_token"):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=auth_config.ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, auth_config.SECRET_KEY, algorithm=auth_config.ALGORITHM)
return encoded_jwt
# [/DEF:create_access_token:Function]
# [DEF:decode_token:Function]
# @PURPOSE: Decodes and validates a JWT token.
# @PRE: token is a signed JWT string.
# @POST: Returns the decoded payload if valid.
#
# @PARAM: token (str) - The JWT to decode.
# @RETURN: dict - The decoded payload.
# @THROW: jose.JWTError - If token is invalid or expired.
def decode_token(token: str) -> dict:
with belief_scope("decode_token"):
payload = jwt.decode(token, auth_config.SECRET_KEY, algorithms=[auth_config.ALGORITHM])
return payload
# [/DEF:decode_token:Function]
# [/DEF:backend.src.core.auth.jwt:Module]

View File

@@ -0,0 +1,31 @@
# [DEF:backend.src.core.auth.logger:Module]
#
# @SEMANTICS: auth, logger, audit, security
# @PURPOSE: Audit logging for security-related events.
# @LAYER: Core
# @RELATION: USES -> backend.src.core.logger.belief_scope
#
# @INVARIANT: Must not log sensitive data like passwords or full tokens.
# [SECTION: IMPORTS]
from ..logger import logger, belief_scope
from datetime import datetime
# [/SECTION]
# [DEF:log_security_event:Function]
# @PURPOSE: Logs a security-related event for audit trails.
# @PRE: event_type and username are strings.
# @POST: Security event is written to the application log.
# @PARAM: event_type (str) - Type of event (e.g., LOGIN_SUCCESS, PERMISSION_DENIED).
# @PARAM: username (str) - The user involved in the event.
# @PARAM: details (dict) - Additional non-sensitive metadata.
def log_security_event(event_type: str, username: str, details: dict = None):
with belief_scope("log_security_event", f"{event_type}:{username}"):
timestamp = datetime.utcnow().isoformat()
msg = f"[AUDIT][{timestamp}][{event_type}] User: {username}"
if details:
msg += f" Details: {details}"
logger.info(msg)
# [/DEF:log_security_event:Function]
# [/DEF:backend.src.core.auth.logger:Module]

View File

@@ -0,0 +1,41 @@
# [DEF:backend.src.core.auth.oauth:Module]
#
# @SEMANTICS: auth, oauth, oidc, adfs
# @PURPOSE: ADFS OIDC configuration and client using Authlib.
# @LAYER: Core
# @RELATION: DEPENDS_ON -> authlib
# @RELATION: USES -> backend.src.core.auth.config.auth_config
#
# @INVARIANT: Must use secure OIDC flows.
# [SECTION: IMPORTS]
from authlib.integrations.starlette_client import OAuth
from .config import auth_config
# [/SECTION]
# [DEF:oauth:Variable]
# @PURPOSE: Global Authlib OAuth registry.
oauth = OAuth()
# [/DEF:oauth:Variable]
# [DEF:register_adfs:Function]
# @PURPOSE: Registers the ADFS OIDC client.
# @PRE: ADFS configuration is provided in auth_config.
# @POST: ADFS client is registered in oauth registry.
def register_adfs():
if auth_config.ADFS_CLIENT_ID:
oauth.register(
name='adfs',
client_id=auth_config.ADFS_CLIENT_ID,
client_secret=auth_config.ADFS_CLIENT_SECRET,
server_metadata_url=auth_config.ADFS_METADATA_URL,
client_kwargs={
'scope': 'openid email profile groups'
}
)
# [/DEF:register_adfs:Function]
# Initial registration
register_adfs()
# [/DEF:backend.src.core.auth.oauth:Module]

View File

@@ -0,0 +1,123 @@
# [DEF:backend.src.core.auth.repository:Module]
#
# @SEMANTICS: auth, repository, database, user, role
# @PURPOSE: Data access layer for authentication-related entities.
# @LAYER: Core
# @RELATION: DEPENDS_ON -> sqlalchemy
# @RELATION: USES -> backend.src.models.auth
#
# @INVARIANT: All database operations must be performed within a session.
# [SECTION: IMPORTS]
from typing import Optional, List
from sqlalchemy.orm import Session
from ...models.auth import User, Role, Permission, ADGroupMapping
from ..logger import belief_scope
# [/SECTION]
# [DEF:AuthRepository:Class]
# @PURPOSE: Encapsulates database operations for authentication.
class AuthRepository:
# [DEF:__init__:Function]
# @PURPOSE: Initializes the repository with a database session.
# @PARAM: db (Session) - SQLAlchemy session.
def __init__(self, db: Session):
self.db = db
# [/DEF:__init__:Function]
# [DEF:get_user_by_username:Function]
# @PURPOSE: Retrieves a user by their username.
# @PRE: username is a string.
# @POST: Returns User object if found, else None.
# @PARAM: username (str) - The username to search for.
# @RETURN: Optional[User] - The found user or None.
def get_user_by_username(self, username: str) -> Optional[User]:
with belief_scope("AuthRepository.get_user_by_username"):
return self.db.query(User).filter(User.username == username).first()
# [/DEF:get_user_by_username:Function]
# [DEF:get_user_by_id:Function]
# @PURPOSE: Retrieves a user by their unique ID.
# @PRE: user_id is a valid UUID string.
# @POST: Returns User object if found, else None.
# @PARAM: user_id (str) - The user's unique identifier.
# @RETURN: Optional[User] - The found user or None.
def get_user_by_id(self, user_id: str) -> Optional[User]:
with belief_scope("AuthRepository.get_user_by_id"):
return self.db.query(User).filter(User.id == user_id).first()
# [/DEF:get_user_by_id:Function]
# [DEF:get_role_by_name:Function]
# @PURPOSE: Retrieves a role by its name.
# @PRE: name is a string.
# @POST: Returns Role object if found, else None.
# @PARAM: name (str) - The role name to search for.
# @RETURN: Optional[Role] - The found role or None.
def get_role_by_name(self, name: str) -> Optional[Role]:
with belief_scope("AuthRepository.get_role_by_name"):
return self.db.query(Role).filter(Role.name == name).first()
# [/DEF:get_role_by_name:Function]
# [DEF:update_last_login:Function]
# @PURPOSE: Updates the last_login timestamp for a user.
# @PRE: user object is a valid User instance.
# @POST: User's last_login is updated in the database.
# @SIDE_EFFECT: Commits the transaction.
# @PARAM: user (User) - The user to update.
def update_last_login(self, user: User):
with belief_scope("AuthRepository.update_last_login"):
from datetime import datetime
user.last_login = datetime.utcnow()
self.db.add(user)
self.db.commit()
# [/DEF:update_last_login:Function]
# [DEF:get_role_by_id:Function]
# @PURPOSE: Retrieves a role by its unique ID.
# @PRE: role_id is a string.
# @POST: Returns Role object if found, else None.
# @PARAM: role_id (str) - The role's unique identifier.
# @RETURN: Optional[Role] - The found role or None.
def get_role_by_id(self, role_id: str) -> Optional[Role]:
with belief_scope("AuthRepository.get_role_by_id"):
return self.db.query(Role).filter(Role.id == role_id).first()
# [/DEF:get_role_by_id:Function]
# [DEF:get_permission_by_id:Function]
# @PURPOSE: Retrieves a permission by its unique ID.
# @PRE: perm_id is a string.
# @POST: Returns Permission object if found, else None.
# @PARAM: perm_id (str) - The permission's unique identifier.
# @RETURN: Optional[Permission] - The found permission or None.
def get_permission_by_id(self, perm_id: str) -> Optional[Permission]:
with belief_scope("AuthRepository.get_permission_by_id"):
return self.db.query(Permission).filter(Permission.id == perm_id).first()
# [/DEF:get_permission_by_id:Function]
# [DEF:get_permission_by_resource_action:Function]
# @PURPOSE: Retrieves a permission by resource and action.
# @PRE: resource and action are strings.
# @POST: Returns Permission object if found, else None.
# @PARAM: resource (str) - The resource name.
# @PARAM: action (str) - The action name.
# @RETURN: Optional[Permission] - The found permission or None.
def get_permission_by_resource_action(self, resource: str, action: str) -> Optional[Permission]:
with belief_scope("AuthRepository.get_permission_by_resource_action"):
return self.db.query(Permission).filter(
Permission.resource == resource,
Permission.action == action
).first()
# [/DEF:get_permission_by_resource_action:Function]
# [DEF:list_permissions:Function]
# @PURPOSE: Lists all available permissions.
# @POST: Returns a list of all Permission objects.
# @RETURN: List[Permission] - List of permissions.
def list_permissions(self) -> List[Permission]:
with belief_scope("AuthRepository.list_permissions"):
return self.db.query(Permission).all()
# [/DEF:list_permissions:Function]
# [/DEF:AuthRepository:Class]
# [/DEF:backend.src.core.auth.repository:Module]

View File

@@ -0,0 +1,42 @@
# [DEF:backend.src.core.auth.security:Module]
#
# @SEMANTICS: security, password, hashing, bcrypt
# @PURPOSE: Utility for password hashing and verification using Passlib.
# @LAYER: Core
# @RELATION: DEPENDS_ON -> passlib
#
# @INVARIANT: Uses bcrypt for hashing with standard work factor.
# [SECTION: IMPORTS]
from passlib.context import CryptContext
# [/SECTION]
# [DEF:pwd_context:Variable]
# @PURPOSE: Passlib CryptContext for password management.
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# [/DEF:pwd_context:Variable]
# [DEF:verify_password:Function]
# @PURPOSE: Verifies a plain password against a hashed password.
# @PRE: plain_password is a string, hashed_password is a bcrypt hash.
# @POST: Returns True if password matches, False otherwise.
#
# @PARAM: plain_password (str) - The unhashed password.
# @PARAM: hashed_password (str) - The stored hash.
# @RETURN: bool - Verification result.
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
# [/DEF:verify_password:Function]
# [DEF:get_password_hash:Function]
# @PURPOSE: Generates a bcrypt hash for a plain password.
# @PRE: password is a string.
# @POST: Returns a secure bcrypt hash string.
#
# @PARAM: password (str) - The password to hash.
# @RETURN: str - The generated hash.
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
# [/DEF:get_password_hash:Function]
# [/DEF:backend.src.core.auth.security:Module]

View File

@@ -5,6 +5,7 @@
# @LAYER: Core
# @RELATION: DEPENDS_ON -> sqlalchemy
# @RELATION: USES -> backend.src.models.mapping
# @RELATION: USES -> backend.src.core.auth.config
#
# @INVARIANT: A single engine instance is used for the entire application.
@@ -16,44 +17,70 @@ from ..models.mapping import Base
from ..models.task import TaskRecord
from ..models.connection import ConnectionConfig
from ..models.git import GitServerConfig, GitRepository, DeploymentEnvironment
from ..models.auth import User, Role, Permission, ADGroupMapping
from .logger import belief_scope
from .auth.config import auth_config
import os
# [/SECTION]
# [DEF:DATABASE_URL:Constant]
# @PURPOSE: URL for the main mappings database.
DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///./mappings.db")
# [/DEF:DATABASE_URL:Constant]
# [DEF:TASKS_DATABASE_URL:Constant]
# @PURPOSE: URL for the tasks execution database.
TASKS_DATABASE_URL = os.getenv("TASKS_DATABASE_URL", "sqlite:///./tasks.db")
# [/DEF:TASKS_DATABASE_URL:Constant]
# [DEF:AUTH_DATABASE_URL:Constant]
# @PURPOSE: URL for the authentication database.
AUTH_DATABASE_URL = auth_config.AUTH_DATABASE_URL
# [/DEF:AUTH_DATABASE_URL:Constant]
# [DEF:engine:Variable]
# @PURPOSE: SQLAlchemy engine for mappings database.
engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False})
# [/DEF:engine:Variable]
# [DEF:tasks_engine:Variable]
# @PURPOSE: SQLAlchemy engine for tasks database.
tasks_engine = create_engine(TASKS_DATABASE_URL, connect_args={"check_same_thread": False})
# [/DEF:tasks_engine:Variable]
# [DEF:auth_engine:Variable]
# @PURPOSE: SQLAlchemy engine for authentication database.
auth_engine = create_engine(AUTH_DATABASE_URL, connect_args={"check_same_thread": False})
# [/DEF:auth_engine:Variable]
# [DEF:SessionLocal:Class]
# @PURPOSE: A session factory for the main mappings database.
# @PRE: engine is initialized.
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# [/DEF:SessionLocal:Class]
# [DEF:TasksSessionLocal:Class]
# @PURPOSE: A session factory for the tasks execution database.
# @PRE: tasks_engine is initialized.
TasksSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=tasks_engine)
# [/DEF:TasksSessionLocal:Class]
# [DEF:AuthSessionLocal:Class]
# @PURPOSE: A session factory for the authentication database.
# @PRE: auth_engine is initialized.
AuthSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=auth_engine)
# [/DEF:AuthSessionLocal:Class]
# [DEF:init_db:Function]
# @PURPOSE: Initializes the database by creating all tables.
# @PRE: engine and tasks_engine are initialized.
# @POST: Database tables created.
# @PRE: engine, tasks_engine and auth_engine are initialized.
# @POST: Database tables created in all databases.
# @SIDE_EFFECT: Creates physical database files if they don't exist.
def init_db():
with belief_scope("init_db"):
Base.metadata.create_all(bind=engine)
Base.metadata.create_all(bind=tasks_engine)
Base.metadata.create_all(bind=auth_engine)
# [/DEF:init_db:Function]
# [DEF:get_db:Function]
@@ -84,4 +111,18 @@ def get_tasks_db():
db.close()
# [/DEF:get_tasks_db:Function]
# [DEF:get_auth_db:Function]
# @PURPOSE: Dependency for getting an authentication database session.
# @PRE: AuthSessionLocal is initialized.
# @POST: Session is closed after use.
# @RETURN: Generator[Session, None, None]
def get_auth_db():
with belief_scope("get_auth_db"):
db = AuthSessionLocal()
try:
yield db
finally:
db.close()
# [/DEF:get_auth_db:Function]
# [/DEF:backend.src.core.database:Module]

View File

@@ -68,6 +68,18 @@ class PluginBase(ABC):
pass
# [/DEF:version:Function]
@property
# [DEF:required_permission:Function]
# @PURPOSE: Returns the required permission string to execute this plugin.
# @PRE: Plugin instance exists.
# @POST: Returns string permission.
# @RETURN: str - Required permission (e.g., "plugin:backup:execute").
def required_permission(self) -> str:
"""The permission string required to execute this plugin."""
with belief_scope("required_permission"):
return f"plugin:{self.id}:execute"
# [/DEF:required_permission:Function]
@property
# [DEF:ui_route:Function]
# @PURPOSE: Returns the frontend route for the plugin's UI, if applicable.