Using logging asynchronously
If you are used to Python’s logging module and are now starting to create async applications, you may have realized that all the default…
If you are used to Python’s logging module and are now starting to create async applications, you may have realized that all the default handlers are blocking, that is, they block the CPU to perform the logging I/O (whether it is to disk or to network).
Starting at Python 3.2, logging.handlers.QueueHandler
was added to address this issue. Basically, it makes use of a Queue to send log messages, which doesn’t involve I/O. Separately, you must start a QueueListener
to pick up those messages and send them to the actual handlers (such as a FileHandler
or similar). The difference is that QueueListener does that in a separate thread, which prevents the execution from blocking and waiting for I/O. Its implementation is quite simple, actually. The usage looks something like this:
from queue import SimpleQueue
import asyncio
import logging
import logging.handlers
queue = SimpleQueue()
queue_handler = logging.handlers.QueueHandler(queue)
listener = logging.handlers.QueueListener(
queue,
logging.StreamHandler(),
logging.FileHandler('app.log'),
)
logger = logging.getLogger('main')
logger.addHandler(queue_handler)
async def main():
logger.info('App started')
...
logger.info('App finished')
listener.start()
try:
asyncio.run(main())
finally:
listener.stop()
In this example, we are creating a QueueHandler
with a SimpleQueue
(a simple yet efficient implementation of a queue) to send the log messages to a separate thread (controlled internally by QueueListener
). When logger.info()
is called, the message is sent to the queue and does not block the execution. You can specify multiple handlers to receive the message. In this case, the message is sent both to stderr (by StreamHandler) and to a file called app.log
.
Note: you must start and stop the listener. If you fail to do either you will lose messages. An alternative to the try/finally approach is to use atexit:
import atexit
listener.start()
atexit.register(listener.stop)
# ... Go on with your program
Using dictConfig
In order to use dictConfig
(which is standard in frameworks like Django) with QueueHandler, you will need Python 3.12+ as they just recently dropped the changes. Python’s documentation has a section on it (although only with YAML). This is what the setup above looks like (as JSON, also adding formatters):
LOGGING = {
"version": 1,
"formatters": {
"default": {"format": "%(asctime)s %(levelname)-8s %(message)s"}
},
"handlers": {
"qhandler": {
"class": "logging.handlers.QueueHandler",
"handlers": ["stderr", "app"]
},
"stderr": {
"class": "logging.StreamHandler",
"formatter": "default"
},
"app": {
"class": "logging.FileHandler",
"filename": "app.log",
"formatter": "default"
}
},
"loggers": {
"main": {
"handlers": ["qhandler"],
"level": "INFO"
}
}
}
import logging.config
logging.config.dictConfig(LOGGING)
But if you’re on Python 3.11 or less, you will get the following message:
Traceback (most recent call last):
File "/usr/lib/python3.10/logging/config.py", line 565, in configure
handler = self.configure_handler(handlers[name])
File "/usr/lib/python3.10/logging/config.py", line 746, in configure_handler
result = factory(**kwargs)
TypeError: QueueHandler.__init__() got an unexpected keyword argument 'handlers'
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/lib/python3.10/logging/config.py", line 811, in dictConfig
dictConfigClass(config).configure()
File "/usr/lib/python3.10/logging/config.py", line 572, in configure
raise ValueError('Unable to configure handler '
ValueError: Unable to configure handler 'qhandler'
Another problem
The solution above works fine for a script or small application. However, if you have a bigger application and/or use multiple log files, you will end up with a different queue and a separate thread for each log file, even if you never log anything to that specific file. This seems like a waste of resources.
The solution
It would make much more sense to have a single thread processing messages from all loggers, and redirecting them to the appropriate handlers, as defined either manually or with your dictConfig.
We can achieve that by subclassing QueueListener and make it start only one thread per process. The target of this thread must check all listeners’ queues for new messages, therefore it cannot block the execution while waiting for new messages. This is what it looks like:
import time
import logging
import logging.handlers
import threading
from queue import Empty
class SingleThreadQueueListener(logging.handlers.QueueListener):
"""A subclass of QueueListener that uses a single thread for all queues.
See https://github.com/python/cpython/blob/main/Lib/logging/handlers.py
for the implementation of QueueListener.
"""
monitor_thread = None
listeners = []
sleep_time = 0.1
@classmethod
def _start(cls):
"""Start a single thread, only if none is started."""
if cls.monitor_thread is None or not cls.monitor_thread.is_alive():
cls.monitor_thread = t = threading.Thread(
target=cls._monitor_all, name='logging_monitor')
t.daemon = True
t.start()
return cls.monitor_thread
@classmethod
def _join(cls):
"""Waits for the thread to stop.
Only call this after stopping all listeners.
"""
if cls.monitor_thread is not None and cls.monitor_thread.is_alive():
cls.monitor_thread.join()
cls.monitor_thread = None
@classmethod
def _monitor_all(cls):
"""A monitor function for all the registered listeners.
Does not block when obtaining messages from the queue to give all
listeners a chance to get an item from the queue. That's why we
must sleep at every cycle.
If a sentinel is sent, the listener is unregistered.
When all listeners are unregistered, the thread stops.
"""
noop = lambda: None
while cls.listeners:
time.sleep(cls.sleep_time) # does not block all threads
for listener in cls.listeners:
try:
# Gets all messages in this queue without blocking
task_done = getattr(listener.queue, 'task_done', noop)
while True:
record = listener.dequeue(False)
if record is listener._sentinel:
cls.listeners.remove(listener)
else:
listener.handle(record)
task_done()
except Empty:
continue
def start(self):
"""Override default implementation.
Register this listener and call class' _start() instead.
"""
SingleThreadQueueListener.listeners.append(self)
# Start if not already
SingleThreadQueueListener._start()
def stop(self):
"""Enqueues the sentinel but does not stop the thread."""
self.enqueue_sentinel()
Explanation: This subclass overrides start and stop to prevent a new thread from being started for every QueueListener object. Instead, it has one class method that starts the thread that will process all listeners. When stopping a listener, the thread cannot immediately stop. Rather, it’s the monitor that must remove the listener from the list. When there are no more registered listeners, the thread stops.
The disadvantage of this approach is that since the calls to queue.get()
are non-blocking, we must put a timer in each cycle to prevent this thread from using 100% CPU. That pause may delay your log from being written to the file, or may cause wrong log order, particularly if you mix threaded and non-threaded log handlers.
Still, that doesn’t solve the problem of using dictConfig discussed above. You still need to create a Queue, QueueHandler and QueueListener for every logger in your application. To do that, we will create another class that makes use of the one above.
import logging
import logging.handlers
from queue import SimpleQueue
class LogContext:
def __init__(self):
self.listeners = []
def iter_loggers(self):
"""Iterates through all registered loggers."""
for name in logging.root.manager.loggerDict:
yield logging.getLogger(name)
yield logging.getLogger() # don't forget the root logger
def __enter__(self):
self.open()
return self
def __exit__(self, exc_type, exc_value, traceback):
self.close()
def open(self):
"""Replace all loggers' handlers with a new listener."""
for logger in self.iter_loggers():
if handlers := logger.handlers:
queue = SimpleQueue()
listener = SingleThreadQueueListener(queue, *handlers)
logger.handlers = [logging.handlers.QueueHandler(queue)]
self.listeners.append((listener, logger))
listener.start()
def close(self):
"""Stops the listener and restores all original handlers."""
while self.listeners:
listener, logger = self.listeners.pop()
logger.handlers = listener.handlers
listener.stop()
Explanation: This class iterates through all handlers in the logging system, and replaces them with new QueueHandlers, while keeping the original handlers in a new listener. That way, you can use dictConfig, then enter into this object’s context and they will all be processed in a separated thread. Leave the context, and they will be restored.
For convenience, this class is also a context manager, so you can use it like this:
LOGGING = {
"version": 1,
"formatters": {
"default": {"format": "%(asctime)s %(name)s %(levelname)-8s %(message)s"}
},
"handlers": {
"app": {
"class": "logging.FileHandler",
"filename": "app.log",
"formatter": "default"
},
"db": {
"class": "logging.FileHandler",
"filename": "db.log",
"formatter": "default"
},
"stderr": {
"class": "logging.StreamHandler",
"formatter": "default"
}
},
"loggers": {
"app": {
"handlers": ["app", "stderr"],
"level": "INFO"
},
"app.database": {
"handlers": ["db"],
"level": "INFO"
}
}
}
async def main():
logging.getLogger('app.database').warning('Nothing to do.')
if __name__ == '__main__':
logging.config.dictConfig(LOGGING)
logging.getLogger('app').info('App started.')
with LogContext():
asyncio.run(main()) # Your main function
logging.getLogger('app').info('App stopped.')
Here we use dictConfig()
to set up our loggers and handlers. When outside the LogContext, log records are sent directly to the configured handlers, and they will be executed in the main thread, blocking execution. When we enter the LogContext, the handlers are replaced with QueueHandlers (meaning the execution is not blocked) and I/O is performed in a separate thread. When the block exits, the handlers are restored and block CPU once again.
Alternatively, you can use it with atexit
:
import atexit
context = LogContext()
context.open()
atexit.register(context.close)
Note: Hopefully the process to write the logs to file (remove them from the queue) is faster than generating them (putting them into the queue), otherwise the queue will end up consuming too much memory and may make your application crash. If that’s the case, you should think of using a queue with a limit and waiting for it to become available.
Update 2024.02.06: In cases where the application generates too many logs in a short amount of time, the sleep between the writes can be very detrimental. I have reduce that sleep time considerably and modified the method _monitor_all()
to obtain all items in a queue before moving on to the next queue. That allows for a rapid consumption of the queue in case there is a burst of entries sent to the logger.