python-progressbar
python-progressbar copied to clipboard
Question: multiprocessing, logging and progressbar
Description
I am trying to work out a way of combining multiprocessing (with several process), logging and your progressbar. The idea is that there will be multiple processes each working on separate tasks, and I'd like a progress bar that shows overall progress across all of them, whilst not preventing logs (and std out) from being shown on screen.
I am not managing however to find a way to get a nice progressbar at the bottom of the console, whilst all the processes send their logging information. I was hoping that when using a QueueHandler with a specific listener process, and a specific process to also receive "status" information and managing the progress bar, it would work, but it does not...
I started from the example given at https://docs.python.org/3/howto/logging-cookbook.html#logging-to-a-single-file-from-multiple-processes and modified it into the following code.
It's fair to say the question is probably not strictly related to the use of your progressbar, but I'm wondering if you'd have any suggestion...
import logging
import logging.handlers
import multiprocessing
from random import choice, random
import time
import progressbar
progressbar.streams.wrap_stderr()
progressbar.streams.wrap_stdout()
def listener_configurer():
root = logging.getLogger()
# h = logging.handlers.RotatingFileHandler('mptest.log', 'a', 300, 3)
# f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s')
# h.setFormatter(f)
# root.addHandler(h)
def listener_process(queue, configurer):
configurer()
while True:
try:
record = queue.get()
if record is None: # We send this as a sentinel to tell the listener to quit.
break
logger = logging.getLogger(record.name)
logger.handle(record) # No level or filter logic applied - just do it!
except Exception:
import sys, traceback
print('Whoops! Problem:', file=sys.stderr)
traceback.print_exc(file=sys.stderr)
# Arrays used for random selections in this demo
LEVELS = [logging.DEBUG, logging.INFO, logging.WARNING,
logging.ERROR, logging.CRITICAL]
LOGGERS = ['a.b.c', 'd.e.f']
MESSAGES = [
'Random message #1',
'Random message #2',
'Random message #3',
]
def worker_configurer(queue):
h = logging.handlers.QueueHandler(queue) # Just the one handler needed
root = logging.getLogger()
root.addHandler(h)
# send all messages, for demo; no other level or filter logic applied.
root.setLevel(logging.DEBUG)
def worker_process(queue, configurer, status_queue):
configurer(queue)
name = multiprocessing.current_process().name
# print('Worker started: %s' % name)
for i in range(10):
time.sleep(random())
logger = logging.getLogger(choice(LOGGERS))
level = choice(LEVELS)
message = choice(MESSAGES)
logger.log(level, message)
status_queue.put(i)
# print('Worker finished: %s' % name)
def status_updater_process(queue):
cpt = 0
with progressbar.ProgressBar(max_value=progressbar.UnknownLength) as bar:
while True:
next = queue.get()
if next is None: # We send this as a sentinel to tell the listener to quit.
break
cpt += 1
bar.update(cpt)
def main():
status_queue = multiprocessing.Queue()
status_worker = multiprocessing.Process(target=status_updater_process,
args=(status_queue, ))
status_worker.start()
queue = multiprocessing.Queue(-1)
listener = multiprocessing.Process(target=listener_process,
args=(queue, listener_configurer))
listener.start()
workers = []
for i in range(10):
worker = multiprocessing.Process(target=worker_process,
args=(queue, worker_configurer, status_queue))
workers.append(worker)
worker.start()
for w in workers:
w.join()
queue.put_nowait(None)
status_queue.put_nowait(None)
listener.join()
status_worker.join()
if __name__ == '__main__':
main()
I'm guessing that the issue is that you are using separate processes for the logging and the progress bar. The progressbar only redirects the stderr output while it's running and it cannot access the stderr from another process.
So, if we combine the status_update_process
with the listener_process
, it should work. To do this we have a few options:
- Send more advanced messages through the queue which contain either a progressbar update or a log message
- Listen to both queues simultaneously by swapping between them
I think option 1 is the best, so let's give that a try :)
import dataclasses
import progressbar
progressbar.streams.wrap_stderr()
progressbar.streams.wrap_stdout()
import logging
import logging.handlers
import multiprocessing
from random import choice, random
import time
@dataclasses.dataclass
class ProgressBarUpdate:
status: int
def listener_configurer():
root = logging.getLogger()
# h = logging.handlers.RotatingFileHandler('mptest.log', 'a', 300, 3)
# f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s')
# h.setFormatter(f)
# root.addHandler(h)
def listener_process(queue, configurer):
configurer()
with progressbar.ProgressBar(max_value=progressbar.UnknownLength) as bar:
while True:
try:
record = queue.get()
if record is None: # We send this as a sentinel to tell the listener to quit.
break
if isinstance(record, ProgressBarUpdate):
bar.update()
continue
else:
logger = logging.getLogger(record.name)
logger.handle(record) # No level or filter logic applied - just do it!
except Exception:
import sys, traceback
print('Whoops! Problem:', file=sys.stderr)
traceback.print_exc(file=sys.stderr)
# Arrays used for random selections in this demo
LEVELS = [logging.DEBUG, logging.INFO, logging.WARNING,
logging.ERROR, logging.CRITICAL]
LOGGERS = ['a.b.c', 'd.e.f']
MESSAGES = [
'Random message #1',
'Random message #2',
'Random message #3',
]
def worker_configurer(queue):
h = logging.handlers.QueueHandler(queue) # Just the one handler needed
root = logging.getLogger()
root.addHandler(h)
# send all messages, for demo; no other level or filter logic applied.
root.setLevel(logging.DEBUG)
def worker_process(queue, configurer):
configurer(queue)
name = multiprocessing.current_process().name
# print('Worker started: %s' % name)
for i in range(10):
time.sleep(random())
logger = logging.getLogger(choice(LOGGERS))
level = choice(LEVELS)
message = choice(MESSAGES)
logger.log(level, message)
queue.put(ProgressBarUpdate(i))
# print('Worker finished: %s' % name)
def main():
queue = multiprocessing.Queue(-1)
listener = multiprocessing.Process(target=listener_process,
args=(queue, listener_configurer))
listener.start()
workers = []
for i in range(10):
worker = multiprocessing.Process(target=worker_process,
args=(queue, worker_configurer))
workers.append(worker)
worker.start()
for w in workers:
w.join()
queue.put_nowait(None)
status_queue.put_nowait(None)
listener.join()
status_worker.join()
if __name__ == '__main__':
main()
Your explanation makes sense, and I had actually thought of doing something like that. It doesn't seem "clean" though: 1 process in charge of 2 radically different things - goes against the separation of concerns a bit...
Still, I think it's got legs and I will try and implement it in the real project. Thansls for your help!
Think of it this way, you'll have a single process responsible for all user interaction.
That is actually how most GUI libraries work as well., a single process for all user interaction which should always remain responsive, and background processes for taking care of all heavy and/or blocking tasks.
You could still split the output process into multiple threads or asyncio tasks of course. As long as they can share the same stderr stream