feat(assistant): implement spec 021 chat assistant flow with semantic contracts

This commit is contained in:
2026-02-23 19:37:56 +03:00
parent 83e4875097
commit 18e96a58bc
27 changed files with 4029 additions and 20 deletions

View File

@@ -1,10 +1,23 @@
# Lazy loading of route modules to avoid import issues in tests
# This allows tests to import routes without triggering all module imports
__all__ = ['plugins', 'tasks', 'settings', 'connections', 'environments', 'mappings', 'migration', 'git', 'storage', 'admin', 'reports']
def __getattr__(name):
if name in __all__:
import importlib
return importlib.import_module(f".{name}", __name__)
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
# [DEF:backend.src.api.routes.__init__:Module]
# @TIER: STANDARD
# @SEMANTICS: routes, lazy-import, module-registry
# @PURPOSE: Provide lazy route module loading to avoid heavyweight imports during tests.
# @LAYER: API
# @RELATION: DEPENDS_ON -> importlib
# @INVARIANT: Only names listed in __all__ are importable via __getattr__.
__all__ = ['plugins', 'tasks', 'settings', 'connections', 'environments', 'mappings', 'migration', 'git', 'storage', 'admin', 'reports', 'assistant']
# [DEF:__getattr__:Function]
# @TIER: TRIVIAL
# @PURPOSE: Lazily import route module by attribute name.
# @PRE: name is module candidate exposed in __all__.
# @POST: Returns imported submodule or raises AttributeError.
def __getattr__(name):
if name in __all__:
import importlib
return importlib.import_module(f".{name}", __name__)
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
# [/DEF:__getattr__:Function]
# [/DEF:backend.src.api.routes.__init__:Module]

View File

@@ -0,0 +1,371 @@
# [DEF:backend.src.api.routes.__tests__.test_assistant_api:Module]
# @TIER: STANDARD
# @SEMANTICS: tests, assistant, api, confirmation, status
# @PURPOSE: Validate assistant API endpoint logic via direct async handler invocation.
# @LAYER: UI (API Tests)
# @RELATION: DEPENDS_ON -> backend.src.api.routes.assistant
# @INVARIANT: Every test clears assistant in-memory state before execution.
import os
import asyncio
from types import SimpleNamespace
# Force isolated sqlite databases for test module before dependencies import.
os.environ.setdefault("DATABASE_URL", "sqlite:////tmp/ss_tools_assistant_api.db")
os.environ.setdefault("TASKS_DATABASE_URL", "sqlite:////tmp/ss_tools_assistant_tasks.db")
os.environ.setdefault("AUTH_DATABASE_URL", "sqlite:////tmp/ss_tools_assistant_auth.db")
from src.api.routes import assistant as assistant_module
from src.models.assistant import (
AssistantAuditRecord,
AssistantConfirmationRecord,
AssistantMessageRecord,
)
# [DEF:_run_async:Function]
# @TIER: TRIVIAL
# @PURPOSE: Execute async endpoint handler in synchronous test context.
# @PRE: coroutine is awaitable endpoint invocation.
# @POST: Returns coroutine result or raises propagated exception.
def _run_async(coroutine):
return asyncio.run(coroutine)
# [/DEF:_run_async:Function]
# [DEF:_FakeTask:Class]
# @TIER: TRIVIAL
# @PURPOSE: Lightweight task stub used by assistant API tests.
class _FakeTask:
def __init__(self, task_id: str, status: str = "RUNNING", user_id: str = "u-admin"):
self.id = task_id
self.status = status
self.user_id = user_id
# [/DEF:_FakeTask:Class]
# [DEF:_FakeTaskManager:Class]
# @TIER: TRIVIAL
# @PURPOSE: Minimal async-compatible TaskManager fixture for deterministic test flows.
class _FakeTaskManager:
def __init__(self):
self._created = []
async def create_task(self, plugin_id, params, user_id=None):
task_id = f"task-{len(self._created) + 1}"
task = _FakeTask(task_id=task_id, status="RUNNING", user_id=user_id)
self._created.append((plugin_id, params, user_id, task))
return task
def get_task(self, task_id):
for _, _, _, task in self._created:
if task.id == task_id:
return task
return None
def get_tasks(self, limit=20, offset=0):
return [x[3] for x in self._created][offset : offset + limit]
# [/DEF:_FakeTaskManager:Class]
# [DEF:_FakeConfigManager:Class]
# @TIER: TRIVIAL
# @PURPOSE: Environment config fixture with dev/prod aliases for parser tests.
class _FakeConfigManager:
def get_environments(self):
return [
SimpleNamespace(id="dev", name="Development"),
SimpleNamespace(id="prod", name="Production"),
]
# [/DEF:_FakeConfigManager:Class]
# [DEF:_admin_user:Function]
# @TIER: TRIVIAL
# @PURPOSE: Build admin principal fixture.
# @PRE: Test harness requires authenticated admin-like principal object.
# @POST: Returns user stub with Admin role.
def _admin_user():
role = SimpleNamespace(name="Admin", permissions=[])
return SimpleNamespace(id="u-admin", username="admin", roles=[role])
# [/DEF:_admin_user:Function]
# [DEF:_limited_user:Function]
# @TIER: TRIVIAL
# @PURPOSE: Build non-admin principal fixture.
# @PRE: Test harness requires restricted principal for deny scenarios.
# @POST: Returns user stub without admin privileges.
def _limited_user():
role = SimpleNamespace(name="Operator", permissions=[])
return SimpleNamespace(id="u-limited", username="limited", roles=[role])
# [/DEF:_limited_user:Function]
# [DEF:_FakeQuery:Class]
# @TIER: TRIVIAL
# @PURPOSE: Minimal chainable query object for fake SQLAlchemy-like DB behavior in tests.
class _FakeQuery:
def __init__(self, rows):
self._rows = list(rows)
def filter(self, *args, **kwargs):
return self
def order_by(self, *args, **kwargs):
return self
def first(self):
return self._rows[0] if self._rows else None
def all(self):
return list(self._rows)
def count(self):
return len(self._rows)
def offset(self, offset):
self._rows = self._rows[offset:]
return self
def limit(self, limit):
self._rows = self._rows[:limit]
return self
# [/DEF:_FakeQuery:Class]
# [DEF:_FakeDb:Class]
# @TIER: TRIVIAL
# @PURPOSE: In-memory fake database implementing subset of Session interface used by assistant routes.
class _FakeDb:
def __init__(self):
self._messages = []
self._confirmations = []
self._audit = []
def add(self, row):
table = getattr(row, "__tablename__", "")
if table == "assistant_messages":
self._messages.append(row)
return
if table == "assistant_confirmations":
self._confirmations.append(row)
return
if table == "assistant_audit":
self._audit.append(row)
def merge(self, row):
table = getattr(row, "__tablename__", "")
if table != "assistant_confirmations":
self.add(row)
return row
for i, existing in enumerate(self._confirmations):
if getattr(existing, "id", None) == getattr(row, "id", None):
self._confirmations[i] = row
return row
self._confirmations.append(row)
return row
def query(self, model):
if model is AssistantMessageRecord:
return _FakeQuery(self._messages)
if model is AssistantConfirmationRecord:
return _FakeQuery(self._confirmations)
if model is AssistantAuditRecord:
return _FakeQuery(self._audit)
return _FakeQuery([])
def commit(self):
return None
def rollback(self):
return None
# [/DEF:_FakeDb:Class]
# [DEF:_clear_assistant_state:Function]
# @TIER: TRIVIAL
# @PURPOSE: Reset in-memory assistant registries for isolation between tests.
# @PRE: Assistant module globals may contain residues from previous test runs.
# @POST: In-memory conversation/confirmation/audit dictionaries are empty.
def _clear_assistant_state():
assistant_module.CONVERSATIONS.clear()
assistant_module.USER_ACTIVE_CONVERSATION.clear()
assistant_module.CONFIRMATIONS.clear()
assistant_module.ASSISTANT_AUDIT.clear()
# [/DEF:_clear_assistant_state:Function]
# [DEF:test_unknown_command_returns_needs_clarification:Function]
# @PURPOSE: Unknown command should return clarification state and unknown intent.
# @PRE: Fake dependencies provide admin user and deterministic task/config/db services.
# @POST: Response state is needs_clarification and no execution side-effect occurs.
def test_unknown_command_returns_needs_clarification():
_clear_assistant_state()
response = _run_async(
assistant_module.send_message(
request=assistant_module.AssistantMessageRequest(message="сделай что-нибудь"),
current_user=_admin_user(),
task_manager=_FakeTaskManager(),
config_manager=_FakeConfigManager(),
db=_FakeDb(),
)
)
assert response.state == "needs_clarification"
assert response.intent["domain"] == "unknown"
# [/DEF:test_unknown_command_returns_needs_clarification:Function]
# [DEF:test_non_admin_command_returns_denied:Function]
# @PURPOSE: Non-admin user must receive denied state for privileged command.
# @PRE: Limited principal executes privileged git branch command.
# @POST: Response state is denied and operation is not executed.
def test_non_admin_command_returns_denied():
_clear_assistant_state()
response = _run_async(
assistant_module.send_message(
request=assistant_module.AssistantMessageRequest(
message="создай ветку feature/test для дашборда 12"
),
current_user=_limited_user(),
task_manager=_FakeTaskManager(),
config_manager=_FakeConfigManager(),
db=_FakeDb(),
)
)
assert response.state == "denied"
# [/DEF:test_non_admin_command_returns_denied:Function]
# [DEF:test_migration_to_prod_requires_confirmation_and_can_be_confirmed:Function]
# @PURPOSE: Migration to prod must require confirmation and then start task after explicit confirm.
# @PRE: Admin principal submits dangerous migration command.
# @POST: Confirmation endpoint transitions flow to started state with task id.
def test_migration_to_prod_requires_confirmation_and_can_be_confirmed():
_clear_assistant_state()
task_manager = _FakeTaskManager()
db = _FakeDb()
first = _run_async(
assistant_module.send_message(
request=assistant_module.AssistantMessageRequest(
message="запусти миграцию с dev на prod для дашборда 12"
),
current_user=_admin_user(),
task_manager=task_manager,
config_manager=_FakeConfigManager(),
db=db,
)
)
assert first.state == "needs_confirmation"
assert first.confirmation_id
second = _run_async(
assistant_module.confirm_operation(
confirmation_id=first.confirmation_id,
current_user=_admin_user(),
task_manager=task_manager,
config_manager=_FakeConfigManager(),
db=db,
)
)
assert second.state == "started"
assert second.task_id.startswith("task-")
# [/DEF:test_migration_to_prod_requires_confirmation_and_can_be_confirmed:Function]
# [DEF:test_status_query_returns_task_status:Function]
# @PURPOSE: Task status command must surface current status text for existing task id.
# @PRE: At least one task exists after confirmed operation.
# @POST: Status query returns started/success and includes referenced task id.
def test_status_query_returns_task_status():
_clear_assistant_state()
task_manager = _FakeTaskManager()
db = _FakeDb()
start = _run_async(
assistant_module.send_message(
request=assistant_module.AssistantMessageRequest(
message="запусти миграцию с dev на prod для дашборда 10"
),
current_user=_admin_user(),
task_manager=task_manager,
config_manager=_FakeConfigManager(),
db=db,
)
)
confirm = _run_async(
assistant_module.confirm_operation(
confirmation_id=start.confirmation_id,
current_user=_admin_user(),
task_manager=task_manager,
config_manager=_FakeConfigManager(),
db=db,
)
)
task_id = confirm.task_id
status_resp = _run_async(
assistant_module.send_message(
request=assistant_module.AssistantMessageRequest(
message=f"проверь статус задачи {task_id}"
),
current_user=_admin_user(),
task_manager=task_manager,
config_manager=_FakeConfigManager(),
db=db,
)
)
assert status_resp.state in {"started", "success"}
assert task_id in status_resp.text
# [/DEF:test_status_query_returns_task_status:Function]
# [DEF:test_status_query_without_task_id_returns_latest_user_task:Function]
# @PURPOSE: Status command without explicit task_id should resolve to latest task for current user.
# @PRE: User has at least one created task in task manager history.
# @POST: Response references latest task status without explicit task id in command.
def test_status_query_without_task_id_returns_latest_user_task():
_clear_assistant_state()
task_manager = _FakeTaskManager()
db = _FakeDb()
start = _run_async(
assistant_module.send_message(
request=assistant_module.AssistantMessageRequest(
message="запусти миграцию с dev на prod для дашборда 33"
),
current_user=_admin_user(),
task_manager=task_manager,
config_manager=_FakeConfigManager(),
db=db,
)
)
_run_async(
assistant_module.confirm_operation(
confirmation_id=start.confirmation_id,
current_user=_admin_user(),
task_manager=task_manager,
config_manager=_FakeConfigManager(),
db=db,
)
)
status_resp = _run_async(
assistant_module.send_message(
request=assistant_module.AssistantMessageRequest(
message="покажи статус последней задачи"
),
current_user=_admin_user(),
task_manager=task_manager,
config_manager=_FakeConfigManager(),
db=db,
)
)
assert status_resp.state in {"started", "success"}
assert "Последняя задача:" in status_resp.text
# [/DEF:test_status_query_without_task_id_returns_latest_user_task:Function]
# [/DEF:backend.src.api.routes.__tests__.test_assistant_api:Module]

View File

@@ -0,0 +1,306 @@
# [DEF:backend.src.api.routes.__tests__.test_assistant_authz:Module]
# @TIER: STANDARD
# @SEMANTICS: tests, assistant, authz, confirmation, rbac
# @PURPOSE: Verify assistant confirmation ownership, expiration, and deny behavior for restricted users.
# @LAYER: UI (API Tests)
# @RELATION: DEPENDS_ON -> backend.src.api.routes.assistant
# @INVARIANT: Security-sensitive flows fail closed for unauthorized actors.
import os
import asyncio
from datetime import datetime, timedelta
from types import SimpleNamespace
import pytest
from fastapi import HTTPException
# Force isolated sqlite databases for test module before dependencies import.
os.environ.setdefault("DATABASE_URL", "sqlite:////tmp/ss_tools_assistant_authz.db")
os.environ.setdefault("TASKS_DATABASE_URL", "sqlite:////tmp/ss_tools_assistant_authz_tasks.db")
os.environ.setdefault("AUTH_DATABASE_URL", "sqlite:////tmp/ss_tools_assistant_authz_auth.db")
from src.api.routes import assistant as assistant_module
from src.models.assistant import (
AssistantAuditRecord,
AssistantConfirmationRecord,
AssistantMessageRecord,
)
# [DEF:_run_async:Function]
# @TIER: TRIVIAL
# @PURPOSE: Execute async endpoint handler in synchronous test context.
# @PRE: coroutine is awaitable endpoint invocation.
# @POST: Returns coroutine result or raises propagated exception.
def _run_async(coroutine):
return asyncio.run(coroutine)
# [/DEF:_run_async:Function]
# [DEF:_FakeTask:Class]
# @TIER: TRIVIAL
# @PURPOSE: Lightweight task model used for assistant authz tests.
class _FakeTask:
def __init__(self, task_id: str, status: str = "RUNNING", user_id: str = "u-admin"):
self.id = task_id
self.status = status
self.user_id = user_id
# [/DEF:_FakeTask:Class]
# [DEF:_FakeTaskManager:Class]
# @TIER: TRIVIAL
# @PURPOSE: Minimal task manager for deterministic operation creation and lookup.
class _FakeTaskManager:
def __init__(self):
self._created = []
async def create_task(self, plugin_id, params, user_id=None):
task_id = f"task-{len(self._created) + 1}"
task = _FakeTask(task_id=task_id, status="RUNNING", user_id=user_id)
self._created.append((plugin_id, params, user_id, task))
return task
def get_task(self, task_id):
for _, _, _, task in self._created:
if task.id == task_id:
return task
return None
def get_tasks(self, limit=20, offset=0):
return [x[3] for x in self._created][offset : offset + limit]
# [/DEF:_FakeTaskManager:Class]
# [DEF:_FakeConfigManager:Class]
# @TIER: TRIVIAL
# @PURPOSE: Provide deterministic environment aliases required by intent parsing.
class _FakeConfigManager:
def get_environments(self):
return [
SimpleNamespace(id="dev", name="Development"),
SimpleNamespace(id="prod", name="Production"),
]
# [/DEF:_FakeConfigManager:Class]
# [DEF:_admin_user:Function]
# @TIER: TRIVIAL
# @PURPOSE: Build admin principal fixture.
# @PRE: Test requires privileged principal for risky operations.
# @POST: Returns admin-like user stub with Admin role.
def _admin_user():
role = SimpleNamespace(name="Admin", permissions=[])
return SimpleNamespace(id="u-admin", username="admin", roles=[role])
# [/DEF:_admin_user:Function]
# [DEF:_other_admin_user:Function]
# @TIER: TRIVIAL
# @PURPOSE: Build second admin principal fixture for ownership tests.
# @PRE: Ownership mismatch scenario needs distinct authenticated actor.
# @POST: Returns alternate admin-like user stub.
def _other_admin_user():
role = SimpleNamespace(name="Admin", permissions=[])
return SimpleNamespace(id="u-admin-2", username="admin2", roles=[role])
# [/DEF:_other_admin_user:Function]
# [DEF:_limited_user:Function]
# @TIER: TRIVIAL
# @PURPOSE: Build limited principal without required assistant execution privileges.
# @PRE: Permission denial scenario needs non-admin actor.
# @POST: Returns restricted user stub.
def _limited_user():
role = SimpleNamespace(name="Operator", permissions=[])
return SimpleNamespace(id="u-limited", username="limited", roles=[role])
# [/DEF:_limited_user:Function]
# [DEF:_FakeQuery:Class]
# @TIER: TRIVIAL
# @PURPOSE: Minimal chainable query object for fake DB interactions.
class _FakeQuery:
def __init__(self, rows):
self._rows = list(rows)
def filter(self, *args, **kwargs):
return self
def order_by(self, *args, **kwargs):
return self
def first(self):
return self._rows[0] if self._rows else None
def all(self):
return list(self._rows)
def limit(self, limit):
self._rows = self._rows[:limit]
return self
def offset(self, offset):
self._rows = self._rows[offset:]
return self
def count(self):
return len(self._rows)
# [/DEF:_FakeQuery:Class]
# [DEF:_FakeDb:Class]
# @TIER: TRIVIAL
# @PURPOSE: In-memory session substitute for assistant route persistence calls.
class _FakeDb:
def __init__(self):
self._messages = []
self._confirmations = []
self._audit = []
def add(self, row):
table = getattr(row, "__tablename__", "")
if table == "assistant_messages":
self._messages.append(row)
elif table == "assistant_confirmations":
self._confirmations.append(row)
elif table == "assistant_audit":
self._audit.append(row)
def merge(self, row):
if getattr(row, "__tablename__", "") != "assistant_confirmations":
self.add(row)
return row
for i, existing in enumerate(self._confirmations):
if getattr(existing, "id", None) == getattr(row, "id", None):
self._confirmations[i] = row
return row
self._confirmations.append(row)
return row
def query(self, model):
if model is AssistantMessageRecord:
return _FakeQuery(self._messages)
if model is AssistantConfirmationRecord:
return _FakeQuery(self._confirmations)
if model is AssistantAuditRecord:
return _FakeQuery(self._audit)
return _FakeQuery([])
def commit(self):
return None
def rollback(self):
return None
# [/DEF:_FakeDb:Class]
# [DEF:_clear_assistant_state:Function]
# @TIER: TRIVIAL
# @PURPOSE: Reset assistant process-local state between test cases.
# @PRE: Assistant globals may contain state from prior tests.
# @POST: Assistant in-memory state dictionaries are cleared.
def _clear_assistant_state():
assistant_module.CONVERSATIONS.clear()
assistant_module.USER_ACTIVE_CONVERSATION.clear()
assistant_module.CONFIRMATIONS.clear()
assistant_module.ASSISTANT_AUDIT.clear()
# [/DEF:_clear_assistant_state:Function]
# [DEF:test_confirmation_owner_mismatch_returns_403:Function]
# @PURPOSE: Confirm endpoint should reject requests from user that does not own the confirmation token.
# @PRE: Confirmation token is created by first admin actor.
# @POST: Second actor receives 403 on confirm operation.
def test_confirmation_owner_mismatch_returns_403():
_clear_assistant_state()
task_manager = _FakeTaskManager()
db = _FakeDb()
create = _run_async(
assistant_module.send_message(
request=assistant_module.AssistantMessageRequest(
message="запусти миграцию с dev на prod для дашборда 18"
),
current_user=_admin_user(),
task_manager=task_manager,
config_manager=_FakeConfigManager(),
db=db,
)
)
assert create.state == "needs_confirmation"
with pytest.raises(HTTPException) as exc:
_run_async(
assistant_module.confirm_operation(
confirmation_id=create.confirmation_id,
current_user=_other_admin_user(),
task_manager=task_manager,
config_manager=_FakeConfigManager(),
db=db,
)
)
assert exc.value.status_code == 403
# [/DEF:test_confirmation_owner_mismatch_returns_403:Function]
# [DEF:test_expired_confirmation_cannot_be_confirmed:Function]
# @PURPOSE: Expired confirmation token should be rejected and not create task.
# @PRE: Confirmation token exists and is manually expired before confirm request.
# @POST: Confirm endpoint raises 400 and no task is created.
def test_expired_confirmation_cannot_be_confirmed():
_clear_assistant_state()
task_manager = _FakeTaskManager()
db = _FakeDb()
create = _run_async(
assistant_module.send_message(
request=assistant_module.AssistantMessageRequest(
message="запусти миграцию с dev на prod для дашборда 19"
),
current_user=_admin_user(),
task_manager=task_manager,
config_manager=_FakeConfigManager(),
db=db,
)
)
assistant_module.CONFIRMATIONS[create.confirmation_id].expires_at = datetime.utcnow() - timedelta(minutes=1)
with pytest.raises(HTTPException) as exc:
_run_async(
assistant_module.confirm_operation(
confirmation_id=create.confirmation_id,
current_user=_admin_user(),
task_manager=task_manager,
config_manager=_FakeConfigManager(),
db=db,
)
)
assert exc.value.status_code == 400
assert task_manager.get_tasks(limit=10, offset=0) == []
# [/DEF:test_expired_confirmation_cannot_be_confirmed:Function]
# [DEF:test_limited_user_cannot_launch_restricted_operation:Function]
# @PURPOSE: Limited user should receive denied state for privileged operation.
# @PRE: Restricted user attempts dangerous deploy command.
# @POST: Assistant returns denied state and does not execute operation.
def test_limited_user_cannot_launch_restricted_operation():
_clear_assistant_state()
response = _run_async(
assistant_module.send_message(
request=assistant_module.AssistantMessageRequest(
message="задеплой дашборд 88 в production"
),
current_user=_limited_user(),
task_manager=_FakeTaskManager(),
config_manager=_FakeConfigManager(),
db=_FakeDb(),
)
)
assert response.state == "denied"
# [/DEF:test_limited_user_cannot_launch_restricted_operation:Function]
# [/DEF:backend.src.api.routes.__tests__.test_assistant_authz:Module]

File diff suppressed because it is too large Load Diff