# [DEF:LoggerModule:Module] # @SEMANTICS: logging, websocket, streaming, handler # @PURPOSE: Configures the application's logging system, including a custom handler for buffering logs and streaming them over WebSockets. # @LAYER: Core # @RELATION: Used by the main application and other modules to log events. The WebSocketLogHandler is used by the WebSocket endpoint in app.py. import logging import threading from datetime import datetime from typing import Dict, Any, List, Optional from collections import deque from contextlib import contextmanager from logging.handlers import RotatingFileHandler from pydantic import BaseModel, Field # Thread-local storage for belief state _belief_state = threading.local() # Global flag for belief state logging _enable_belief_state = True # [DEF:BeliefFormatter:Class] # @PURPOSE: Custom logging formatter that adds belief state prefixes to log messages. class BeliefFormatter(logging.Formatter): # [DEF:format:Function] # @PURPOSE: Formats the log record, adding belief state context if available. # @PRE: record is a logging.LogRecord. # @POST: Returns formatted string. # @PARAM: record (logging.LogRecord) - The log record to format. # @RETURN: str - The formatted log message. # @SEMANTICS: logging, formatter, context def format(self, record): anchor_id = getattr(_belief_state, 'anchor_id', None) if anchor_id: record.msg = f"[{anchor_id}][Action] {record.msg}" return super().format(record) # [/DEF:format:Function] # [/DEF:BeliefFormatter:Class] # Re-using LogEntry from task_manager for consistency # [DEF:LogEntry:Class] # @SEMANTICS: log, entry, record, pydantic # @PURPOSE: A Pydantic model representing a single, structured log entry. This is a re-definition for consistency, as it's also defined in task_manager.py. class LogEntry(BaseModel): timestamp: datetime = Field(default_factory=datetime.utcnow) level: str message: str context: Optional[Dict[str, Any]] = None # [/DEF:LogEntry:Class] # [DEF:belief_scope:Function] # @PURPOSE: Context manager for structured Belief State logging. # @PARAM: anchor_id (str) - The identifier for the current semantic block. # @PARAM: message (str) - Optional entry message. # @PRE: anchor_id must be provided. # @POST: Thread-local belief state is updated and entry/exit logs are generated. # @SEMANTICS: logging, context, belief_state @contextmanager def belief_scope(anchor_id: str, message: str = ""): # Log Entry if enabled if _enable_belief_state: entry_msg = f"[{anchor_id}][Entry]" if message: entry_msg += f" {message}" logger.info(entry_msg) # Set thread-local anchor_id old_anchor = getattr(_belief_state, 'anchor_id', None) _belief_state.anchor_id = anchor_id try: yield # Log Coherence OK and Exit logger.info(f"[{anchor_id}][Coherence:OK]") if _enable_belief_state: logger.info(f"[{anchor_id}][Exit]") except Exception as e: # Log Coherence Failed logger.info(f"[{anchor_id}][Coherence:Failed] {str(e)}") raise finally: # Restore old anchor _belief_state.anchor_id = old_anchor # [/DEF:belief_scope:Function] # [DEF:configure_logger:Function] # @PURPOSE: Configures the logger with the provided logging settings. # @PRE: config is a valid LoggingConfig instance. # @POST: Logger level, handlers, and belief state flag are updated. # @PARAM: config (LoggingConfig) - The logging configuration. # @SEMANTICS: logging, configuration, initialization def configure_logger(config): global _enable_belief_state _enable_belief_state = config.enable_belief_state # Set logger level level = getattr(logging, config.level.upper(), logging.INFO) logger.setLevel(level) # Remove existing file handlers handlers_to_remove = [h for h in logger.handlers if isinstance(h, RotatingFileHandler)] for h in handlers_to_remove: logger.removeHandler(h) h.close() # Add file handler if file_path is set if config.file_path: import os from pathlib import Path log_file = Path(config.file_path) log_file.parent.mkdir(parents=True, exist_ok=True) file_handler = RotatingFileHandler( config.file_path, maxBytes=config.max_bytes, backupCount=config.backup_count ) file_handler.setFormatter(BeliefFormatter( '[%(asctime)s][%(levelname)s][%(name)s] %(message)s' )) logger.addHandler(file_handler) # Update existing handlers' formatters to BeliefFormatter for handler in logger.handlers: if not isinstance(handler, RotatingFileHandler): handler.setFormatter(BeliefFormatter( '[%(asctime)s][%(levelname)s][%(name)s] %(message)s' )) # [/DEF:configure_logger:Function] # [DEF:WebSocketLogHandler:Class] # @SEMANTICS: logging, handler, websocket, buffer # @PURPOSE: A custom logging handler that captures log records into a buffer. It is designed to be extended for real-time log streaming over WebSockets. class WebSocketLogHandler(logging.Handler): """ A logging handler that stores log records and can be extended to send them over WebSockets. """ # [DEF:__init__:Function] # @PURPOSE: Initializes the handler with a fixed-capacity buffer. # @PRE: capacity is an integer. # @POST: Instance initialized with empty deque. # @PARAM: capacity (int) - Maximum number of logs to keep in memory. # @SEMANTICS: logging, initialization, buffer def __init__(self, capacity: int = 1000): super().__init__() self.log_buffer: deque[LogEntry] = deque(maxlen=capacity) # In a real implementation, you'd have a way to manage active WebSocket connections # e.g., self.active_connections: Set[WebSocket] = set() # [/DEF:__init__:Function] # [DEF:emit:Function] # @PURPOSE: Captures a log record, formats it, and stores it in the buffer. # @PRE: record is a logging.LogRecord. # @POST: Log is added to the log_buffer. # @PARAM: record (logging.LogRecord) - The log record to emit. # @SEMANTICS: logging, handler, buffer def emit(self, record: logging.LogRecord): try: log_entry = LogEntry( level=record.levelname, message=self.format(record), context={ "name": record.name, "pathname": record.pathname, "lineno": record.lineno, "funcName": record.funcName, "process": record.process, "thread": record.thread, } ) self.log_buffer.append(log_entry) # Here you would typically send the log_entry to all active WebSocket connections # for real-time streaming to the frontend. # Example: for ws in self.active_connections: await ws.send_json(log_entry.dict()) except Exception: self.handleError(record) # [/DEF:emit:Function] # [DEF:get_recent_logs:Function] # @PURPOSE: Returns a list of recent log entries from the buffer. # @PRE: None. # @POST: Returns list of LogEntry objects. # @RETURN: List[LogEntry] - List of buffered log entries. # @SEMANTICS: logging, buffer, retrieval def get_recent_logs(self) -> List[LogEntry]: """ Returns a list of recent log entries from the buffer. """ return list(self.log_buffer) # [/DEF:get_recent_logs:Function] # [/DEF:WebSocketLogHandler:Class] # [DEF:Logger:Global] # @SEMANTICS: logger, global, instance # @PURPOSE: The global logger instance for the application, configured with both a console handler and the custom WebSocket handler. logger = logging.getLogger("superset_tools_app") # [DEF:believed:Function] # @PURPOSE: A decorator that wraps a function in a belief scope. # @PARAM: anchor_id (str) - The identifier for the semantic block. # @PRE: anchor_id must be a string. # @POST: Returns a decorator function. def believed(anchor_id: str): # [DEF:decorator:Function] # @PURPOSE: Internal decorator for belief scope. # @PRE: func must be a callable. # @POST: Returns the wrapped function. def decorator(func): # [DEF:wrapper:Function] # @PURPOSE: Internal wrapper that enters belief scope. # @PRE: None. # @POST: Executes the function within a belief scope. def wrapper(*args, **kwargs): with belief_scope(anchor_id): return func(*args, **kwargs) # [/DEF:wrapper:Function] return wrapper # [/DEF:decorator:Function] return decorator # [/DEF:believed:Function] logger.setLevel(logging.INFO) # Create a formatter formatter = BeliefFormatter( '[%(asctime)s][%(levelname)s][%(name)s] %(message)s' ) # Add console handler console_handler = logging.StreamHandler() console_handler.setFormatter(formatter) logger.addHandler(console_handler) # Add WebSocket log handler websocket_log_handler = WebSocketLogHandler() websocket_log_handler.setFormatter(formatter) logger.addHandler(websocket_log_handler) # Example usage: # logger.info("Application started", extra={"context_key": "context_value"}) # logger.error("An error occurred", exc_info=True) # [/DEF:Logger:Global] # [/DEF:LoggerModule:Module]