multiprocessing-logging
multiprocessing-logging copied to clipboard
Allow underlying queue to be customized
I've got a use case where I have I'd like to inject a custom queue type that behaves closely to multiprocessing.Queue. This allows the user to supply a factory function to create new queue instances.
Interesting. What is the use case for this?
So, like I said, the core use case is being able to use a custom queue.
The long story is that I'm using some CUDA code with multiprocessing (and hopefully logging too!). CUDA will not work with the default (on Unix based systems) fork context of multiprocessing. Because of this, I am constrained to using the spawn context. Since the child process does not get a copy of the parent process memory space in the spawn context, the current implementation of multiprocessing-logging does not work. The reason it does not work on Windows/OSX is the same I believe.
I've written some fairly repository specific code to allow me to do logging from multiprocessing with a spawn context. There are some very large overlaps with the code used in this repository, with the largest deviation being that I need to use a multiprocessing.Manager().Queue() instead of a multiprocessing.Queue(). There is also some set-up that needs to be run in the child processes, but I think that is out of scope of multiprocessing-logging.
So, in the end, I'd really like to take advantage of the MultiProcessHandler class you've written, but I need to be able to use multiprocessing.Manager().Queue(). I figured the best approach might be allow any user to inject any multiprocessing.Queue()-like object. If there is another approach that you would prefer, I'd be happy to explore that.
This is very interesting! Lets me read a little and play a little with this. Ideally, the library should choose the right queue automatically but I don't know how feasible is that.
Ideally, the library should choose the right queue automatically but I don't know how feasible is that.
Yeah, in a perfect world, I agree. Unfortunately, to get logging to work in my spawn context with multiprocessing pool I actually needed a lot more code that I'm not sure belongs in this library.
class QueueProxy():
"""
Exposes a multiprocessing.Queue interface to a queue.Queue
"""
def __init__(self, q):
self.q = q
def qsize(self):
return self.q.qsize()
def empty(self):
return self.q.empty()
def full(self):
return self.q.full()
def put(self, *args, **kwargs):
return self.q.put(*args, **kwargs)
def put_nowait(self, *args, **kwargs):
return self.q.put_nowait(*args, **kwargs)
def get(self, *args, **kwargs):
return self.q.get(*args, **kwargs)
def get_nowarit(self, *args, **kwargs):
return self.q.get_nowait(*args, **kwargs)
def close(self):
pass
def join_thread(self):
pass
def cancel_join_thread(self):
pass
class ManagedQueueMultiProcessingHandler(MultiProcessingHandler):
def __init__(self, name, sub_handler=None):
super().__init__(name, sub_handler, lambda: QueueProxy(multiprocessing.Manager().Queue(-1)))
def _initwrap(queues, levels, initfunc, initargs):
# Configure the logger for this process
root_logger = logging.getLogger("")
for handler in root_logger.handlers:
root_logger.removeHandler(handler)
root_logger.setLevel(0)
for queue, level in zip(queues, levels):
handler = logging.handlers.QueueHandler(queue)
handler.setLevel(level)
root_logger.addHandler(handler)
# Call the original provided initfunc
if initfunc is not None and initargs is not None:
initfunc(*initargs)
elif initfunc is not None:
initfunc()
class LoggedSpawnerPool(multiprocessing.pool.Pool):
def __init__(self, processes=None, initializer=None, initargs=None, maxtasksperchild=None, context=None):
queues = []
levels = []
for handler in logger.parent.handlers:
if isinstance(handler, MultiProcessingHandler):
queues.append(handler.queue)
levels.append(handler.level)
else:
logger.warn(f"Log handler {handler} is not a MultiProcessingHandler, so will not get log messages during multiprocessing")
ctx = multiprocessing.get_context("spawn") if context is None else context
super().__init__(processes, _initwrap, [queues, levels, initializer, initargs], maxtasksperchild, ctx)
which can then be used like
logger = logging.getLogger(__name__)
def worker(args):
x = args
logger.info(x)
if __name__ == "__main__":
config_logging() # sets handlers and wraps them in ManagedQueueMulitProcessingHandler
with LoggedSpawnerPool(3) as pool:
pool.map(worker, range(10))
Hopefully that can help give you a little more context.