# [DEF:LoggerModule:Module] # @SEMANTICS: logging, websocket, streaming, handler # @PURPOSE: Configures the application's logging system, including a custom handler for buffering logs and streaming them over WebSockets. # @LAYER: Core # @RELATION: Used by the main application and other modules to log events. The WebSocketLogHandler is used by the WebSocket endpoint in app.py. import logging import threading from datetime import datetime from typing import Dict, Any, List, Optional from collections import deque from contextlib import contextmanager from logging.handlers import RotatingFileHandler from pydantic import BaseModel, Field # Thread-local storage for belief state _belief_state = threading.local() # Global flag for belief state logging _enable_belief_state = True # Global task log level filter _task_log_level = "INFO" # [DEF:BeliefFormatter:Class] # @PURPOSE: Custom logging formatter that adds belief state prefixes to log messages. class BeliefFormatter(logging.Formatter): # [DEF:format:Function] # @PURPOSE: Formats the log record, adding belief state context if available. # @PRE: record is a logging.LogRecord. # @POST: Returns formatted string. # @PARAM: record (logging.LogRecord) - The log record to format. # @RETURN: str - The formatted log message. # @SEMANTICS: logging, formatter, context def format(self, record): anchor_id = getattr(_belief_state, 'anchor_id', None) if anchor_id: record.msg = f"[{anchor_id}][Action] {record.msg}" return super().format(record) # [/DEF:format:Function] # [/DEF:BeliefFormatter:Class] # Re-using LogEntry from task_manager for consistency # [DEF:LogEntry:Class] # @SEMANTICS: log, entry, record, pydantic # @PURPOSE: A Pydantic model representing a single, structured log entry. This is a re-definition for consistency, as it's also defined in task_manager.py. class LogEntry(BaseModel): timestamp: datetime = Field(default_factory=datetime.utcnow) level: str message: str context: Optional[Dict[str, Any]] = None # [/DEF:LogEntry:Class] # [DEF:belief_scope:Function] # @PURPOSE: Context manager for structured Belief State logging. # @PARAM: anchor_id (str) - The identifier for the current semantic block. # @PARAM: message (str) - Optional entry message. # @PRE: anchor_id must be provided. # @POST: Thread-local belief state is updated and entry/exit logs are generated. # @SEMANTICS: logging, context, belief_state @contextmanager def belief_scope(anchor_id: str, message: str = ""): # Log Entry if enabled (DEBUG level to reduce noise) if _enable_belief_state: entry_msg = f"[{anchor_id}][Entry]" if message: entry_msg += f" {message}" logger.debug(entry_msg) # Set thread-local anchor_id old_anchor = getattr(_belief_state, 'anchor_id', None) _belief_state.anchor_id = anchor_id try: yield # Log Coherence OK and Exit (DEBUG level to reduce noise) logger.debug(f"[{anchor_id}][Coherence:OK]") if _enable_belief_state: logger.debug(f"[{anchor_id}][Exit]") except Exception as e: # Log Coherence Failed (DEBUG level to reduce noise) logger.debug(f"[{anchor_id}][Coherence:Failed] {str(e)}") raise finally: # Restore old anchor _belief_state.anchor_id = old_anchor # [/DEF:belief_scope:Function] # [DEF:configure_logger:Function] # @PURPOSE: Configures the logger with the provided logging settings. # @PRE: config is a valid LoggingConfig instance. # @POST: Logger level, handlers, belief state flag, and task log level are updated. # @PARAM: config (LoggingConfig) - The logging configuration. # @SEMANTICS: logging, configuration, initialization def configure_logger(config): global _enable_belief_state, _task_log_level _enable_belief_state = config.enable_belief_state _task_log_level = config.task_log_level.upper() # Set logger level level = getattr(logging, config.level.upper(), logging.INFO) logger.setLevel(level) # Remove existing file handlers handlers_to_remove = [h for h in logger.handlers if isinstance(h, RotatingFileHandler)] for h in handlers_to_remove: logger.removeHandler(h) h.close() # Add file handler if file_path is set if config.file_path: from pathlib import Path log_file = Path(config.file_path) log_file.parent.mkdir(parents=True, exist_ok=True) file_handler = RotatingFileHandler( config.file_path, maxBytes=config.max_bytes, backupCount=config.backup_count ) file_handler.setFormatter(BeliefFormatter( '[%(asctime)s][%(levelname)s][%(name)s] %(message)s' )) logger.addHandler(file_handler) # Update existing handlers' formatters to BeliefFormatter for handler in logger.handlers: if not isinstance(handler, RotatingFileHandler): handler.setFormatter(BeliefFormatter( '[%(asctime)s][%(levelname)s][%(name)s] %(message)s' )) # [/DEF:configure_logger:Function] # [DEF:get_task_log_level:Function] # @PURPOSE: Returns the current task log level filter. # @PRE: None. # @POST: Returns the task log level string. # @RETURN: str - The current task log level (DEBUG, INFO, WARNING, ERROR). # @SEMANTICS: logging, configuration, getter def get_task_log_level() -> str: """Returns the current task log level filter.""" return _task_log_level # [/DEF:get_task_log_level:Function] # [DEF:should_log_task_level:Function] # @PURPOSE: Checks if a log level should be recorded based on task_log_level setting. # @PRE: level is a valid log level string. # @POST: Returns True if level meets or exceeds task_log_level threshold. # @PARAM: level (str) - The log level to check. # @RETURN: bool - True if the level should be logged. # @SEMANTICS: logging, filter, level def should_log_task_level(level: str) -> bool: """Checks if a log level should be recorded based on task_log_level setting.""" level_order = {"DEBUG": 0, "INFO": 1, "WARNING": 2, "ERROR": 3} current_level = _task_log_level.upper() check_level = level.upper() current_order = level_order.get(current_level, 1) # Default to INFO check_order = level_order.get(check_level, 1) return check_order >= current_order # [/DEF:should_log_task_level:Function] # [DEF:WebSocketLogHandler:Class] # @SEMANTICS: logging, handler, websocket, buffer # @PURPOSE: A custom logging handler that captures log records into a buffer. It is designed to be extended for real-time log streaming over WebSockets. class WebSocketLogHandler(logging.Handler): """ A logging handler that stores log records and can be extended to send them over WebSockets. """ # [DEF:__init__:Function] # @PURPOSE: Initializes the handler with a fixed-capacity buffer. # @PRE: capacity is an integer. # @POST: Instance initialized with empty deque. # @PARAM: capacity (int) - Maximum number of logs to keep in memory. # @SEMANTICS: logging, initialization, buffer def __init__(self, capacity: int = 1000): super().__init__() self.log_buffer: deque[LogEntry] = deque(maxlen=capacity) # In a real implementation, you'd have a way to manage active WebSocket connections # e.g., self.active_connections: Set[WebSocket] = set() # [/DEF:__init__:Function] # [DEF:emit:Function] # @PURPOSE: Captures a log record, formats it, and stores it in the buffer. # @PRE: record is a logging.LogRecord. # @POST: Log is added to the log_buffer. # @PARAM: record (logging.LogRecord) - The log record to emit. # @SEMANTICS: logging, handler, buffer def emit(self, record: logging.LogRecord): try: log_entry = LogEntry( level=record.levelname, message=self.format(record), context={ "name": record.name, "pathname": record.pathname, "lineno": record.lineno, "funcName": record.funcName, "process": record.process, "thread": record.thread, } ) self.log_buffer.append(log_entry) # Here you would typically send the log_entry to all active WebSocket connections # for real-time streaming to the frontend. # Example: for ws in self.active_connections: await ws.send_json(log_entry.dict()) except Exception: self.handleError(record) # [/DEF:emit:Function] # [DEF:get_recent_logs:Function] # @PURPOSE: Returns a list of recent log entries from the buffer. # @PRE: None. # @POST: Returns list of LogEntry objects. # @RETURN: List[LogEntry] - List of buffered log entries. # @SEMANTICS: logging, buffer, retrieval def get_recent_logs(self) -> List[LogEntry]: """ Returns a list of recent log entries from the buffer. """ return list(self.log_buffer) # [/DEF:get_recent_logs:Function] # [/DEF:WebSocketLogHandler:Class] # [DEF:Logger:Global] # @SEMANTICS: logger, global, instance # @PURPOSE: The global logger instance for the application, configured with both a console handler and the custom WebSocket handler. logger = logging.getLogger("superset_tools_app") # [DEF:believed:Function] # @PURPOSE: A decorator that wraps a function in a belief scope. # @PARAM: anchor_id (str) - The identifier for the semantic block. # @PRE: anchor_id must be a string. # @POST: Returns a decorator function. def believed(anchor_id: str): # [DEF:decorator:Function] # @PURPOSE: Internal decorator for belief scope. # @PRE: func must be a callable. # @POST: Returns the wrapped function. def decorator(func): # [DEF:wrapper:Function] # @PURPOSE: Internal wrapper that enters belief scope. # @PRE: None. # @POST: Executes the function within a belief scope. def wrapper(*args, **kwargs): with belief_scope(anchor_id): return func(*args, **kwargs) # [/DEF:wrapper:Function] return wrapper # [/DEF:decorator:Function] return decorator # [/DEF:believed:Function] logger.setLevel(logging.INFO) # Create a formatter formatter = BeliefFormatter( '[%(asctime)s][%(levelname)s][%(name)s] %(message)s' ) # Add console handler console_handler = logging.StreamHandler() console_handler.setFormatter(formatter) logger.addHandler(console_handler) # Add WebSocket log handler websocket_log_handler = WebSocketLogHandler() websocket_log_handler.setFormatter(formatter) logger.addHandler(websocket_log_handler) # Example usage: # logger.info("Application started", extra={"context_key": "context_value"}) # logger.error("An error occurred", exc_info=True) # [/DEF:Logger:Global] # [/DEF:LoggerModule:Module]