feat(assistant): add conversations list, infinite history scroll, and archived tab

This commit is contained in:
2026-02-23 20:27:51 +03:00
parent 40e6d8cd4c
commit ab1c87ffba
4 changed files with 559 additions and 7 deletions

View File

@@ -9,6 +9,7 @@
import os
import asyncio
from types import SimpleNamespace
from datetime import datetime, timedelta
# Force isolated sqlite databases for test module before dependencies import.
os.environ.setdefault("DATABASE_URL", "sqlite:////tmp/ss_tools_assistant_api.db")
@@ -391,4 +392,144 @@ def test_llm_validation_missing_dashboard_returns_needs_clarification():
# [/DEF:test_llm_validation_missing_dashboard_returns_needs_clarification:Function]
# [DEF:test_list_conversations_groups_by_conversation_and_marks_archived:Function]
# @PURPOSE: Conversations endpoint must group messages and compute archived marker by inactivity threshold.
# @PRE: Fake DB contains two conversations with different update timestamps.
# @POST: Response includes both conversations with archived flag set for stale one.
def test_list_conversations_groups_by_conversation_and_marks_archived():
_clear_assistant_state()
db = _FakeDb()
now = datetime.utcnow()
db.add(
AssistantMessageRecord(
id="m-1",
user_id="u-admin",
conversation_id="conv-active",
role="user",
text="active chat",
created_at=now,
)
)
db.add(
AssistantMessageRecord(
id="m-2",
user_id="u-admin",
conversation_id="conv-old",
role="user",
text="old chat",
created_at=now - timedelta(days=assistant_module.ASSISTANT_ARCHIVE_AFTER_DAYS + 2),
)
)
result = _run_async(
assistant_module.list_conversations(
page=1,
page_size=20,
include_archived=True,
search=None,
current_user=_admin_user(),
db=db,
)
)
assert result["total"] == 2
by_id = {item["conversation_id"]: item for item in result["items"]}
assert by_id["conv-active"]["archived"] is False
assert by_id["conv-old"]["archived"] is True
# [/DEF:test_list_conversations_groups_by_conversation_and_marks_archived:Function]
# [DEF:test_history_from_latest_returns_recent_page_first:Function]
# @PURPOSE: History endpoint from_latest mode must return newest page while preserving chronological order in chunk.
# @PRE: Conversation has more messages than single page size.
# @POST: First page returns latest messages and has_next indicates older pages exist.
def test_history_from_latest_returns_recent_page_first():
_clear_assistant_state()
db = _FakeDb()
base_time = datetime.utcnow() - timedelta(minutes=10)
conv_id = "conv-paginated"
for i in range(4, -1, -1):
db.add(
AssistantMessageRecord(
id=f"msg-{i}",
user_id="u-admin",
conversation_id=conv_id,
role="user" if i % 2 == 0 else "assistant",
text=f"message-{i}",
created_at=base_time + timedelta(minutes=i),
)
)
result = _run_async(
assistant_module.get_history(
page=1,
page_size=2,
conversation_id=conv_id,
from_latest=True,
current_user=_admin_user(),
db=db,
)
)
assert result["from_latest"] is True
assert result["has_next"] is True
# Chunk is chronological while representing latest page.
assert [item["text"] for item in result["items"]] == ["message-3", "message-4"]
# [/DEF:test_history_from_latest_returns_recent_page_first:Function]
# [DEF:test_list_conversations_archived_only_filters_active:Function]
# @PURPOSE: archived_only mode must return only archived conversations.
# @PRE: Dataset includes one active and one archived conversation.
# @POST: Only archived conversation remains in response payload.
def test_list_conversations_archived_only_filters_active():
_clear_assistant_state()
db = _FakeDb()
now = datetime.utcnow()
db.add(
AssistantMessageRecord(
id="m-active",
user_id="u-admin",
conversation_id="conv-active-2",
role="user",
text="active",
created_at=now,
)
)
db.add(
AssistantMessageRecord(
id="m-archived",
user_id="u-admin",
conversation_id="conv-archived-2",
role="user",
text="archived",
created_at=now - timedelta(days=assistant_module.ASSISTANT_ARCHIVE_AFTER_DAYS + 3),
)
)
result = _run_async(
assistant_module.list_conversations(
page=1,
page_size=20,
include_archived=True,
archived_only=True,
search=None,
current_user=_admin_user(),
db=db,
)
)
assert result["total"] == 1
assert result["items"][0]["conversation_id"] == "conv-archived-2"
assert result["items"][0]["archived"] is True
# [/DEF:test_list_conversations_archived_only_filters_active:Function]
# [/DEF:backend.src.api.routes.__tests__.test_assistant_api:Module]

View File

@@ -103,6 +103,8 @@ CONVERSATIONS: Dict[Tuple[str, str], List[Dict[str, Any]]] = {}
USER_ACTIVE_CONVERSATION: Dict[str, str] = {}
CONFIRMATIONS: Dict[str, ConfirmationRecord] = {}
ASSISTANT_AUDIT: Dict[str, List[Dict[str, Any]]] = {}
ASSISTANT_ARCHIVE_AFTER_DAYS = 14
ASSISTANT_MESSAGE_TTL_DAYS = 90
INTENT_PERMISSION_CHECKS: Dict[str, List[Tuple[str, str]]] = {
"get_task_status": [("tasks", "READ")],
@@ -334,6 +336,68 @@ def _resolve_or_create_conversation(user_id: str, conversation_id: Optional[str]
# [/DEF:_resolve_or_create_conversation:Function]
# [DEF:_cleanup_history_ttl:Function]
# @PURPOSE: Enforce assistant message retention window by deleting expired rows and in-memory records.
# @PRE: db session is available and user_id references current actor scope.
# @POST: Messages older than ASSISTANT_MESSAGE_TTL_DAYS are removed from persistence and memory mirrors.
def _cleanup_history_ttl(db: Session, user_id: str):
cutoff = datetime.utcnow() - timedelta(days=ASSISTANT_MESSAGE_TTL_DAYS)
try:
query = db.query(AssistantMessageRecord).filter(
AssistantMessageRecord.user_id == user_id,
AssistantMessageRecord.created_at < cutoff,
)
if hasattr(query, "delete"):
query.delete(synchronize_session=False)
db.commit()
except Exception as exc:
db.rollback()
logger.warning(f"[assistant.history][ttl_cleanup_failed] user={user_id} error={exc}")
stale_keys: List[Tuple[str, str]] = []
for key, items in CONVERSATIONS.items():
if key[0] != user_id:
continue
kept = []
for item in items:
created_at = item.get("created_at")
if isinstance(created_at, datetime) and created_at < cutoff:
continue
kept.append(item)
if kept:
CONVERSATIONS[key] = kept
else:
stale_keys.append(key)
for key in stale_keys:
CONVERSATIONS.pop(key, None)
# [/DEF:_cleanup_history_ttl:Function]
# [DEF:_is_conversation_archived:Function]
# @PURPOSE: Determine archived state for a conversation based on last update timestamp.
# @PRE: updated_at can be null for empty conversations.
# @POST: Returns True when conversation inactivity exceeds archive threshold.
def _is_conversation_archived(updated_at: Optional[datetime]) -> bool:
if not updated_at:
return False
cutoff = datetime.utcnow() - timedelta(days=ASSISTANT_ARCHIVE_AFTER_DAYS)
return updated_at < cutoff
# [/DEF:_is_conversation_archived:Function]
# [DEF:_coerce_query_bool:Function]
# @PURPOSE: Normalize bool-like query values for compatibility in direct handler invocations/tests.
# @PRE: value may be bool, string, or FastAPI Query metadata object.
# @POST: Returns deterministic boolean flag.
def _coerce_query_bool(value: Any) -> bool:
if isinstance(value, bool):
return value
if isinstance(value, str):
return value.strip().lower() in {"1", "true", "yes", "on"}
return False
# [/DEF:_coerce_query_bool:Function]
# [DEF:_extract_id:Function]
# @PURPOSE: Extract first regex match group from text by ordered pattern list.
# @PRE: patterns contain at least one capture group.
@@ -1385,6 +1449,93 @@ async def cancel_operation(
# [/DEF:cancel_operation:Function]
# [DEF:list_conversations:Function]
# @PURPOSE: Return paginated conversation list for current user with archived flag and last message preview.
# @PRE: Authenticated user context and valid pagination params.
# @POST: Conversations are grouped by conversation_id sorted by latest activity descending.
# @RETURN: Dict with items, paging metadata, and archive segmentation counts.
@router.get("/conversations")
async def list_conversations(
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
include_archived: bool = Query(False),
archived_only: bool = Query(False),
search: Optional[str] = Query(None),
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
with belief_scope("assistant.conversations"):
user_id = current_user.id
include_archived = _coerce_query_bool(include_archived)
archived_only = _coerce_query_bool(archived_only)
_cleanup_history_ttl(db, user_id)
rows = (
db.query(AssistantMessageRecord)
.filter(AssistantMessageRecord.user_id == user_id)
.order_by(desc(AssistantMessageRecord.created_at))
.all()
)
summary: Dict[str, Dict[str, Any]] = {}
for row in rows:
conv_id = row.conversation_id
if not conv_id:
continue
created_at = row.created_at or datetime.utcnow()
if conv_id not in summary:
summary[conv_id] = {
"conversation_id": conv_id,
"title": "",
"updated_at": created_at,
"last_message": row.text,
"last_role": row.role,
"last_state": row.state,
"last_task_id": row.task_id,
"message_count": 0,
}
item = summary[conv_id]
item["message_count"] += 1
if row.role == "user" and row.text and not item["title"]:
item["title"] = row.text.strip()[:80]
items = []
search_term = search.lower().strip() if search else ""
archived_total = sum(1 for c in summary.values() if _is_conversation_archived(c.get("updated_at")))
active_total = len(summary) - archived_total
for conv in summary.values():
conv["archived"] = _is_conversation_archived(conv.get("updated_at"))
if not conv.get("title"):
conv["title"] = f"Conversation {conv['conversation_id'][:8]}"
if search_term:
haystack = f"{conv.get('title', '')} {conv.get('last_message', '')}".lower()
if search_term not in haystack:
continue
if archived_only and not conv["archived"]:
continue
if not archived_only and not include_archived and conv["archived"]:
continue
updated = conv.get("updated_at")
conv["updated_at"] = updated.isoformat() if isinstance(updated, datetime) else None
items.append(conv)
items.sort(key=lambda x: x.get("updated_at") or "", reverse=True)
total = len(items)
start = (page - 1) * page_size
page_items = items[start : start + page_size]
return {
"items": page_items,
"total": total,
"page": page,
"page_size": page_size,
"has_next": start + page_size < total,
"active_total": active_total,
"archived_total": archived_total,
}
# [/DEF:list_conversations:Function]
@router.get("/history")
# [DEF:get_history:Function]
# @PURPOSE: Retrieve paginated assistant conversation history for current user.
@@ -1395,24 +1546,41 @@ async def get_history(
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
conversation_id: Optional[str] = Query(None),
from_latest: bool = Query(False),
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
with belief_scope("assistant.history"):
user_id = current_user.id
_cleanup_history_ttl(db, user_id)
conv_id = _resolve_or_create_conversation(user_id, conversation_id, db)
query = (
base_query = (
db.query(AssistantMessageRecord)
.filter(
AssistantMessageRecord.user_id == user_id,
AssistantMessageRecord.conversation_id == conv_id,
)
.order_by(AssistantMessageRecord.created_at.asc())
)
total = query.count()
total = base_query.count()
start = (page - 1) * page_size
rows = query.offset(start).limit(page_size).all()
if from_latest:
rows = (
base_query
.order_by(desc(AssistantMessageRecord.created_at))
.offset(start)
.limit(page_size)
.all()
)
rows = list(reversed(rows))
else:
rows = (
base_query
.order_by(AssistantMessageRecord.created_at.asc())
.offset(start)
.limit(page_size)
.all()
)
persistent_items = [
{
@@ -1437,6 +1605,7 @@ async def get_history(
"page": page,
"page_size": page_size,
"has_next": start + page_size < total,
"from_latest": from_latest,
"conversation_id": conv_id,
}
# [/DEF:get_history:Function]