Module timebased_logger
timebased_logger.py A production-grade logger that logs messages based on time intervals, log levels, formatting, and supports async/thread-safe modes and flexible IO.
Classes
class TimeBasedLogger (interval_seconds=1,
log_fn=<built-in function print>,
max_logs_per_interval=None,
time_fn=None,
async_mode=False,
batch_size=10,
thread_safe=False,
level='INFO',
fmt='[{level}] {asctime} {message}')-
Expand source code
class TimeBasedLogger: """ TimeBasedLogger(interval_seconds=1, log_fn=print, max_logs_per_interval=None, time_fn=None, async_mode=False, batch_size=10, thread_safe=False, level='INFO', fmt='[{level}] {asctime} {message}') A logger that emits messages at a specified interval, with support for log levels, formatting, exception logging, async and thread-safe operation, and flexible output (IO). Args: interval_seconds (float): Minimum time in seconds between logs. log_fn (callable): Function to handle log output (default: print). max_logs_per_interval (int, optional): Maximum number of logs allowed per interval. If None, unlimited. time_fn (callable, optional): Custom function to get the current time (default: time.time). async_mode (bool): If True, logs are queued and processed in a background thread. batch_size (int): Number of logs to batch before flushing in async mode. thread_safe (bool): If True, uses a lock for thread safety (default: False for max speed). level (str|int): Minimum log level to emit (default: 'INFO'). fmt (str): Log message format (default: '[{level}] {asctime} {message}'). Usage: logger = TimeBasedLogger(level='INFO', fmt='[{level}] {message}') logger.info('Hello world') """ def __init__(self, interval_seconds=1, log_fn=print, max_logs_per_interval=None, time_fn=None, async_mode=False, batch_size=10, thread_safe=False, level='INFO', fmt='[{level}] {asctime} {message}'): self.interval_seconds = interval_seconds self.log_fn = log_fn self.max_logs_per_interval = max_logs_per_interval self.time_fn = time_fn or time.time self._last_log_time = None self._logs_this_interval = 0 self._interval_start = None self._paused = False self.async_mode = async_mode self.batch_size = batch_size self.thread_safe = thread_safe self._lock = threading.Lock() if thread_safe else None self.level = self._level_to_int(level) self.fmt = fmt if async_mode: self._queue = queue.Queue() self._stop_event = threading.Event() self._worker = threading.Thread(target=self._worker_fn, daemon=True) self._worker.start() def _level_to_int(self, level: Any) -> int: if isinstance(level, int): return level return LOG_LEVELS.get(str(level).upper(), 20) def setLevel(self, level): """Set the minimum log level for the logger.""" self.level = self._level_to_int(level) def log(self, message, level='INFO', exc_info=None, extra=None): """Logs a message with the specified level. Args: message (str): The message to log. level (str): The log level (default: 'INFO'). exc_info (tuple, optional): Exception information to log. extra (dict, optional): Extra context to add to the log message. """ lvl = self._level_to_int(level) if lvl < self.level: return record = self._format_record(message, level, exc_info, extra) if self._paused: return if self.async_mode: self._queue.put(record) return self._log_internal(record) def _log_internal(self, record): """Internal method to handle the actual logging. Args: record (str): The formatted log record. """ now = self.time_fn() if self.thread_safe: lock = self._lock lock.acquire() try: if self._interval_start is None or now - self._interval_start >= self.interval_seconds: self._interval_start = now self._logs_this_interval = 0 self._last_log_time = None if self.max_logs_per_interval is not None and self._logs_this_interval >= self.max_logs_per_interval: return if self._last_log_time is None or now - self._last_log_time >= self.interval_seconds: self.log_fn(record) self._last_log_time = now self._logs_this_interval += 1 elif self.max_logs_per_interval is not None: self.log_fn(record) self._logs_this_interval += 1 finally: if self.thread_safe: lock.release() def _worker_fn(self): """Background worker function for async mode. It retrieves messages from the queue and processes them in batches. """ batch = [] while not self._stop_event.is_set() or not self._queue.empty(): try: msg = self._queue.get(timeout=0.1) batch.append(msg) if len(batch) >= self.batch_size: self._flush_batch(batch) batch.clear() except queue.Empty: if batch: self._flush_batch(batch) batch.clear() # Final flush if batch: self._flush_batch(batch) def _flush_batch(self, batch): """Flushes a batch of log messages. Args: batch (list): A list of log messages. """ for msg in batch: try: self._log_internal(msg) except Exception: # Optionally log the error somewhere, or just ignore for robustness pass def _format_record(self, message, level, exc_info, extra): asctime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(self.time_fn())) base = { 'level': str(level).upper(), 'asctime': asctime, 'message': message, } if extra: base.update(extra) formatted = self.fmt.format(**base) if exc_info: import traceback if not isinstance(exc_info, tuple): exc_info = sys.exc_info() formatted += '\n' + ''.join(traceback.format_exception(*exc_info)) return formatted def debug(self, message, **kwargs): """Logs a message with level DEBUG.""" self.log(message, level='DEBUG', **kwargs) def info(self, message, **kwargs): """Logs a message with level INFO.""" self.log(message, level='INFO', **kwargs) def warning(self, message, **kwargs): """Logs a message with level WARNING.""" self.log(message, level='WARNING', **kwargs) def error(self, message, exc_info=None, **kwargs): """Logs a message with level ERROR.""" self.log(message, level='ERROR', exc_info=exc_info, **kwargs) def critical(self, message, exc_info=None, **kwargs): """Logs a message with level CRITICAL.""" self.log(message, level='CRITICAL', exc_info=exc_info, **kwargs) def flush(self): """Flushes any remaining messages in the queue (for async mode).""" if self.async_mode: while not self._queue.empty(): time.sleep(0.01) def close(self): """Closes the logger, stopping the background worker (for async mode).""" if self.async_mode: self._stop_event.set() self._worker.join() def pause(self): """Pauses the logger, preventing new messages from being logged.""" self._paused = True def resume(self): """Resumes the logger after being paused. Allows immediate logging after resume. """ self._paused = False self._last_log_time = None # Allow immediate logging after resume
TimeBasedLogger(interval_seconds=1, log_fn=print, max_logs_per_interval=None, time_fn=None, async_mode=False, batch_size=10, thread_safe=False, level='INFO', fmt='[{level}] {asctime} {message}')
A logger that emits messages at a specified interval, with support for log levels, formatting, exception logging, async and thread-safe operation, and flexible output (IO).
Args
interval_seconds
:float
- Minimum time in seconds between logs.
log_fn
:callable
- Function to handle log output (default: print).
max_logs_per_interval
:int
, optional- Maximum number of logs allowed per interval. If None, unlimited.
time_fn
:callable
, optional- Custom function to get the current time (default: time.time).
async_mode
:bool
- If True, logs are queued and processed in a background thread.
batch_size
:int
- Number of logs to batch before flushing in async mode.
thread_safe
:bool
- If True, uses a lock for thread safety (default: False for max speed).
level
:str|int
- Minimum log level to emit (default: 'INFO').
fmt
:str
- Log message format (default: '[{level}] {asctime} {message}').
Usage
logger = TimeBasedLogger(level='INFO', fmt='[{level}] {message}') logger.info('Hello world')
Methods
def close(self)
-
Expand source code
def close(self): """Closes the logger, stopping the background worker (for async mode).""" if self.async_mode: self._stop_event.set() self._worker.join()
Closes the logger, stopping the background worker (for async mode).
def critical(self, message, exc_info=None, **kwargs)
-
Expand source code
def critical(self, message, exc_info=None, **kwargs): """Logs a message with level CRITICAL.""" self.log(message, level='CRITICAL', exc_info=exc_info, **kwargs)
Logs a message with level CRITICAL.
def debug(self, message, **kwargs)
-
Expand source code
def debug(self, message, **kwargs): """Logs a message with level DEBUG.""" self.log(message, level='DEBUG', **kwargs)
Logs a message with level DEBUG.
def error(self, message, exc_info=None, **kwargs)
-
Expand source code
def error(self, message, exc_info=None, **kwargs): """Logs a message with level ERROR.""" self.log(message, level='ERROR', exc_info=exc_info, **kwargs)
Logs a message with level ERROR.
def flush(self)
-
Expand source code
def flush(self): """Flushes any remaining messages in the queue (for async mode).""" if self.async_mode: while not self._queue.empty(): time.sleep(0.01)
Flushes any remaining messages in the queue (for async mode).
def info(self, message, **kwargs)
-
Expand source code
def info(self, message, **kwargs): """Logs a message with level INFO.""" self.log(message, level='INFO', **kwargs)
Logs a message with level INFO.
def log(self, message, level='INFO', exc_info=None, extra=None)
-
Expand source code
def log(self, message, level='INFO', exc_info=None, extra=None): """Logs a message with the specified level. Args: message (str): The message to log. level (str): The log level (default: 'INFO'). exc_info (tuple, optional): Exception information to log. extra (dict, optional): Extra context to add to the log message. """ lvl = self._level_to_int(level) if lvl < self.level: return record = self._format_record(message, level, exc_info, extra) if self._paused: return if self.async_mode: self._queue.put(record) return self._log_internal(record)
Logs a message with the specified level.
Args
message
:str
- The message to log.
level
:str
- The log level (default: 'INFO').
exc_info
:tuple
, optional- Exception information to log.
extra
:dict
, optional- Extra context to add to the log message.
def pause(self)
-
Expand source code
def pause(self): """Pauses the logger, preventing new messages from being logged.""" self._paused = True
Pauses the logger, preventing new messages from being logged.
def resume(self)
-
Expand source code
def resume(self): """Resumes the logger after being paused. Allows immediate logging after resume. """ self._paused = False self._last_log_time = None # Allow immediate logging after resume
Resumes the logger after being paused. Allows immediate logging after resume.
def setLevel(self, level)
-
Expand source code
def setLevel(self, level): """Set the minimum log level for the logger.""" self.level = self._level_to_int(level)
Set the minimum log level for the logger.
def warning(self, message, **kwargs)
-
Expand source code
def warning(self, message, **kwargs): """Logs a message with level WARNING.""" self.log(message, level='WARNING', **kwargs)
Logs a message with level WARNING.