semantic update
This commit is contained in:
@@ -8,23 +8,33 @@
|
||||
|
||||
# [SECTION: IMPORTS]
|
||||
import asyncio
|
||||
import threading
|
||||
import inspect
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from datetime import datetime
|
||||
from typing import Dict, Any, List, Optional
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
from .models import Task, TaskStatus, LogEntry
|
||||
from .persistence import TaskPersistenceService
|
||||
from ..logger import logger, belief_scope
|
||||
from .models import Task, TaskStatus, LogEntry, LogFilter, LogStats, TaskLog
|
||||
from .persistence import TaskPersistenceService, TaskLogPersistenceService
|
||||
from .context import TaskContext
|
||||
from ..logger import logger, belief_scope, should_log_task_level
|
||||
# [/SECTION]
|
||||
|
||||
# [DEF:TaskManager:Class]
|
||||
# @SEMANTICS: task, manager, lifecycle, execution, state
|
||||
# @PURPOSE: Manages the lifecycle of tasks, including their creation, execution, and state tracking.
|
||||
# @TIER: CRITICAL
|
||||
# @INVARIANT: Task IDs are unique within the registry.
|
||||
# @INVARIANT: Each task has exactly one status at any time.
|
||||
# @INVARIANT: Log entries are never deleted after being added to a task.
|
||||
class TaskManager:
|
||||
"""
|
||||
Manages the lifecycle of tasks, including their creation, execution, and state tracking.
|
||||
"""
|
||||
|
||||
# Log flush interval in seconds
|
||||
LOG_FLUSH_INTERVAL = 2.0
|
||||
|
||||
# [DEF:__init__:Function]
|
||||
# @PURPOSE: Initialize the TaskManager with dependencies.
|
||||
# @PRE: plugin_loader is initialized.
|
||||
@@ -35,8 +45,18 @@ class TaskManager:
|
||||
self.plugin_loader = plugin_loader
|
||||
self.tasks: Dict[str, Task] = {}
|
||||
self.subscribers: Dict[str, List[asyncio.Queue]] = {}
|
||||
self.executor = ThreadPoolExecutor(max_workers=5) # For CPU-bound plugin execution
|
||||
self.executor = ThreadPoolExecutor(max_workers=5) # For CPU-bound plugin execution
|
||||
self.persistence_service = TaskPersistenceService()
|
||||
self.log_persistence_service = TaskLogPersistenceService()
|
||||
|
||||
# Log buffer: task_id -> List[LogEntry]
|
||||
self._log_buffer: Dict[str, List[LogEntry]] = {}
|
||||
self._log_buffer_lock = threading.Lock()
|
||||
|
||||
# Flusher thread for batch writing logs
|
||||
self._flusher_stop_event = threading.Event()
|
||||
self._flusher_thread = threading.Thread(target=self._flusher_loop, daemon=True)
|
||||
self._flusher_thread.start()
|
||||
|
||||
try:
|
||||
self.loop = asyncio.get_running_loop()
|
||||
@@ -47,6 +67,59 @@ class TaskManager:
|
||||
# Load persisted tasks on startup
|
||||
self.load_persisted_tasks()
|
||||
# [/DEF:__init__:Function]
|
||||
|
||||
# [DEF:_flusher_loop:Function]
|
||||
# @PURPOSE: Background thread that periodically flushes log buffer to database.
|
||||
# @PRE: TaskManager is initialized.
|
||||
# @POST: Logs are batch-written to database every LOG_FLUSH_INTERVAL seconds.
|
||||
def _flusher_loop(self):
|
||||
"""Background thread that flushes log buffer to database."""
|
||||
while not self._flusher_stop_event.is_set():
|
||||
self._flush_logs()
|
||||
self._flusher_stop_event.wait(self.LOG_FLUSH_INTERVAL)
|
||||
# [/DEF:_flusher_loop:Function]
|
||||
|
||||
# [DEF:_flush_logs:Function]
|
||||
# @PURPOSE: Flush all buffered logs to the database.
|
||||
# @PRE: None.
|
||||
# @POST: All buffered logs are written to task_logs table.
|
||||
def _flush_logs(self):
|
||||
"""Flush all buffered logs to the database."""
|
||||
with self._log_buffer_lock:
|
||||
task_ids = list(self._log_buffer.keys())
|
||||
|
||||
for task_id in task_ids:
|
||||
with self._log_buffer_lock:
|
||||
logs = self._log_buffer.pop(task_id, [])
|
||||
|
||||
if logs:
|
||||
try:
|
||||
self.log_persistence_service.add_logs(task_id, logs)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to flush logs for task {task_id}: {e}")
|
||||
# Re-add logs to buffer on failure
|
||||
with self._log_buffer_lock:
|
||||
if task_id not in self._log_buffer:
|
||||
self._log_buffer[task_id] = []
|
||||
self._log_buffer[task_id].extend(logs)
|
||||
# [/DEF:_flush_logs:Function]
|
||||
|
||||
# [DEF:_flush_task_logs:Function]
|
||||
# @PURPOSE: Flush logs for a specific task immediately.
|
||||
# @PRE: task_id exists.
|
||||
# @POST: Task's buffered logs are written to database.
|
||||
# @PARAM: task_id (str) - The task ID.
|
||||
def _flush_task_logs(self, task_id: str):
|
||||
"""Flush logs for a specific task immediately."""
|
||||
with self._log_buffer_lock:
|
||||
logs = self._log_buffer.pop(task_id, [])
|
||||
|
||||
if logs:
|
||||
try:
|
||||
self.log_persistence_service.add_logs(task_id, logs)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to flush logs for task {task_id}: {e}")
|
||||
# [/DEF:_flush_task_logs:Function]
|
||||
|
||||
# [DEF:create_task:Function]
|
||||
# @PURPOSE: Creates and queues a new task for execution.
|
||||
@@ -78,7 +151,7 @@ class TaskManager:
|
||||
# [/DEF:create_task:Function]
|
||||
|
||||
# [DEF:_run_task:Function]
|
||||
# @PURPOSE: Internal method to execute a task.
|
||||
# @PURPOSE: Internal method to execute a task with TaskContext support.
|
||||
# @PRE: Task exists in registry.
|
||||
# @POST: Task is executed, status updated to SUCCESS or FAILED.
|
||||
# @PARAM: task_id (str) - The ID of the task to run.
|
||||
@@ -91,30 +164,54 @@ class TaskManager:
|
||||
task.status = TaskStatus.RUNNING
|
||||
task.started_at = datetime.utcnow()
|
||||
self.persistence_service.persist_task(task)
|
||||
self._add_log(task_id, "INFO", f"Task started for plugin '{plugin.name}'")
|
||||
self._add_log(task_id, "INFO", f"Task started for plugin '{plugin.name}'", source="system")
|
||||
|
||||
try:
|
||||
# Execute plugin
|
||||
# Prepare params and check if plugin supports new TaskContext
|
||||
params = {**task.params, "_task_id": task_id}
|
||||
|
||||
if asyncio.iscoroutinefunction(plugin.execute):
|
||||
task.result = await plugin.execute(params)
|
||||
else:
|
||||
task.result = await self.loop.run_in_executor(
|
||||
self.executor,
|
||||
plugin.execute,
|
||||
params
|
||||
# Check if plugin's execute method accepts 'context' parameter
|
||||
sig = inspect.signature(plugin.execute)
|
||||
accepts_context = 'context' in sig.parameters
|
||||
|
||||
if accepts_context:
|
||||
# Create TaskContext for new-style plugins
|
||||
context = TaskContext(
|
||||
task_id=task_id,
|
||||
add_log_fn=self._add_log,
|
||||
params=params,
|
||||
default_source="plugin"
|
||||
)
|
||||
|
||||
if asyncio.iscoroutinefunction(plugin.execute):
|
||||
task.result = await plugin.execute(params, context=context)
|
||||
else:
|
||||
task.result = await self.loop.run_in_executor(
|
||||
self.executor,
|
||||
lambda: plugin.execute(params, context=context)
|
||||
)
|
||||
else:
|
||||
# Backward compatibility: old-style plugins without context
|
||||
if asyncio.iscoroutinefunction(plugin.execute):
|
||||
task.result = await plugin.execute(params)
|
||||
else:
|
||||
task.result = await self.loop.run_in_executor(
|
||||
self.executor,
|
||||
plugin.execute,
|
||||
params
|
||||
)
|
||||
|
||||
logger.info(f"Task {task_id} completed successfully")
|
||||
task.status = TaskStatus.SUCCESS
|
||||
self._add_log(task_id, "INFO", f"Task completed successfully for plugin '{plugin.name}'")
|
||||
self._add_log(task_id, "INFO", f"Task completed successfully for plugin '{plugin.name}'", source="system")
|
||||
except Exception as e:
|
||||
logger.error(f"Task {task_id} failed: {e}")
|
||||
task.status = TaskStatus.FAILED
|
||||
self._add_log(task_id, "ERROR", f"Task failed: {e}", {"error_type": type(e).__name__})
|
||||
self._add_log(task_id, "ERROR", f"Task failed: {e}", source="system", metadata={"error_type": type(e).__name__})
|
||||
finally:
|
||||
task.finished_at = datetime.utcnow()
|
||||
# Flush any remaining buffered logs before persisting task
|
||||
self._flush_task_logs(task_id)
|
||||
self.persistence_service.persist_task(task)
|
||||
logger.info(f"Task {task_id} execution finished with status: {task.status}")
|
||||
# [/DEF:_run_task:Function]
|
||||
@@ -224,36 +321,106 @@ class TaskManager:
|
||||
# [/DEF:get_tasks:Function]
|
||||
|
||||
# [DEF:get_task_logs:Function]
|
||||
# @PURPOSE: Retrieves logs for a specific task.
|
||||
# @PURPOSE: Retrieves logs for a specific task (from memory for running, persistence for completed).
|
||||
# @PRE: task_id is a string.
|
||||
# @POST: Returns list of LogEntry objects.
|
||||
# @POST: Returns list of LogEntry or TaskLog objects.
|
||||
# @PARAM: task_id (str) - ID of the task.
|
||||
# @PARAM: log_filter (Optional[LogFilter]) - Filter parameters.
|
||||
# @RETURN: List[LogEntry] - List of log entries.
|
||||
def get_task_logs(self, task_id: str) -> List[LogEntry]:
|
||||
def get_task_logs(self, task_id: str, log_filter: Optional[LogFilter] = None) -> List[LogEntry]:
|
||||
with belief_scope("TaskManager.get_task_logs", f"task_id={task_id}"):
|
||||
task = self.tasks.get(task_id)
|
||||
|
||||
# For completed tasks, fetch from persistence
|
||||
if task and task.status in [TaskStatus.SUCCESS, TaskStatus.FAILED]:
|
||||
if log_filter is None:
|
||||
log_filter = LogFilter()
|
||||
task_logs = self.log_persistence_service.get_logs(task_id, log_filter)
|
||||
# Convert TaskLog to LogEntry for backward compatibility
|
||||
return [
|
||||
LogEntry(
|
||||
timestamp=log.timestamp,
|
||||
level=log.level,
|
||||
message=log.message,
|
||||
source=log.source,
|
||||
metadata=log.metadata
|
||||
)
|
||||
for log in task_logs
|
||||
]
|
||||
|
||||
# For running/pending tasks, return from memory
|
||||
return task.logs if task else []
|
||||
# [/DEF:get_task_logs:Function]
|
||||
|
||||
# [DEF:get_task_log_stats:Function]
|
||||
# @PURPOSE: Get statistics about logs for a task.
|
||||
# @PRE: task_id is a valid task ID.
|
||||
# @POST: Returns LogStats with counts by level and source.
|
||||
# @PARAM: task_id (str) - The task ID.
|
||||
# @RETURN: LogStats - Statistics about task logs.
|
||||
def get_task_log_stats(self, task_id: str) -> LogStats:
|
||||
with belief_scope("TaskManager.get_task_log_stats", f"task_id={task_id}"):
|
||||
return self.log_persistence_service.get_log_stats(task_id)
|
||||
# [/DEF:get_task_log_stats:Function]
|
||||
|
||||
# [DEF:get_task_log_sources:Function]
|
||||
# @PURPOSE: Get unique sources for a task's logs.
|
||||
# @PRE: task_id is a valid task ID.
|
||||
# @POST: Returns list of unique source strings.
|
||||
# @PARAM: task_id (str) - The task ID.
|
||||
# @RETURN: List[str] - Unique source names.
|
||||
def get_task_log_sources(self, task_id: str) -> List[str]:
|
||||
with belief_scope("TaskManager.get_task_log_sources", f"task_id={task_id}"):
|
||||
return self.log_persistence_service.get_sources(task_id)
|
||||
# [/DEF:get_task_log_sources:Function]
|
||||
|
||||
# [DEF:_add_log:Function]
|
||||
# @PURPOSE: Adds a log entry to a task and notifies subscribers.
|
||||
# @PURPOSE: Adds a log entry to a task buffer and notifies subscribers.
|
||||
# @PRE: Task exists.
|
||||
# @POST: Log added to task and pushed to queues.
|
||||
# @POST: Log added to buffer and pushed to queues (if level meets task_log_level filter).
|
||||
# @PARAM: task_id (str) - ID of the task.
|
||||
# @PARAM: level (str) - Log level.
|
||||
# @PARAM: message (str) - Log message.
|
||||
# @PARAM: context (Optional[Dict]) - Log context.
|
||||
def _add_log(self, task_id: str, level: str, message: str, context: Optional[Dict[str, Any]] = None):
|
||||
# @PARAM: source (str) - Source component (default: "system").
|
||||
# @PARAM: metadata (Optional[Dict]) - Additional structured data.
|
||||
# @PARAM: context (Optional[Dict]) - Legacy context (for backward compatibility).
|
||||
def _add_log(
|
||||
self,
|
||||
task_id: str,
|
||||
level: str,
|
||||
message: str,
|
||||
source: str = "system",
|
||||
metadata: Optional[Dict[str, Any]] = None,
|
||||
context: Optional[Dict[str, Any]] = None
|
||||
):
|
||||
with belief_scope("TaskManager._add_log", f"task_id={task_id}"):
|
||||
task = self.tasks.get(task_id)
|
||||
if not task:
|
||||
return
|
||||
|
||||
log_entry = LogEntry(level=level, message=message, context=context)
|
||||
task.logs.append(log_entry)
|
||||
self.persistence_service.persist_task(task)
|
||||
# Filter logs based on task_log_level configuration
|
||||
if not should_log_task_level(level):
|
||||
return
|
||||
|
||||
# Notify subscribers
|
||||
# Create log entry with new fields
|
||||
log_entry = LogEntry(
|
||||
level=level,
|
||||
message=message,
|
||||
source=source,
|
||||
metadata=metadata,
|
||||
context=context # Keep for backward compatibility
|
||||
)
|
||||
|
||||
# Add to in-memory logs (for backward compatibility with legacy JSON field)
|
||||
task.logs.append(log_entry)
|
||||
|
||||
# Add to buffer for batch persistence
|
||||
with self._log_buffer_lock:
|
||||
if task_id not in self._log_buffer:
|
||||
self._log_buffer[task_id] = []
|
||||
self._log_buffer[task_id].append(log_entry)
|
||||
|
||||
# Notify subscribers (for real-time WebSocket updates)
|
||||
if task_id in self.subscribers:
|
||||
for queue in self.subscribers[task_id]:
|
||||
self.loop.call_soon_threadsafe(queue.put_nowait, log_entry)
|
||||
@@ -353,7 +520,7 @@ class TaskManager:
|
||||
# [/DEF:resume_task_with_password:Function]
|
||||
|
||||
# [DEF:clear_tasks:Function]
|
||||
# @PURPOSE: Clears tasks based on status filter.
|
||||
# @PURPOSE: Clears tasks based on status filter (also deletes associated logs).
|
||||
# @PRE: status is Optional[TaskStatus].
|
||||
# @POST: Tasks matching filter (or all non-active) cleared from registry and database.
|
||||
# @PARAM: status (Optional[TaskStatus]) - Filter by task status.
|
||||
@@ -387,9 +554,13 @@ class TaskManager:
|
||||
|
||||
del self.tasks[tid]
|
||||
|
||||
# Remove from persistence
|
||||
# Remove from persistence (task_records and task_logs via CASCADE)
|
||||
self.persistence_service.delete_tasks(tasks_to_remove)
|
||||
|
||||
# Also explicitly delete logs (in case CASCADE is not set up)
|
||||
if tasks_to_remove:
|
||||
self.log_persistence_service.delete_logs_for_tasks(tasks_to_remove)
|
||||
|
||||
logger.info(f"Cleared {len(tasks_to_remove)} tasks.")
|
||||
return len(tasks_to_remove)
|
||||
# [/DEF:clear_tasks:Function]
|
||||
|
||||
Reference in New Issue
Block a user