multiprocessing-logging icon indicating copy to clipboard operation
multiprocessing-logging copied to clipboard

Hangs when using even simple example

Open das-intensity opened this issue 3 years ago • 7 comments

After having issues in a fairly complex program, I was able to cut it down to just this code:

#!/usr/bin/env python3

from multiprocessing import Pool
import logging
import sys
import time

from multiprocessing_logging import install_mp_handler

logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
install_mp_handler()

logger = logging.getLogger()

def func():
    time.sleep(0.5)

if __name__ == '__main__':
    while True:
        logger.info('a message')

        pool = Pool(2)

        for i in range(2):
            pool.apply_async(func)

        pool.close()
        pool.join()

The issue happens:

  • On python 3.5, 3.7, and 3.9 (Amazon Linux and ArchLinux)
  • Regardless of whether using multiprocessing.Pool or concurrent.futures.ProcessPoolExecutor.

Am I missing something really obvious here?

das-intensity avatar Mar 10 '21 04:03 das-intensity

Replacing:

self._receive_thread = threading.Thread(target=self._receive, name=name)

with

self._receive_thread = multiprocessing.Process(target=self._receive, name=name)

does seem to solve the problem, but I'm not sure enough of the side-effects of that.

das-intensity avatar Mar 10 '21 04:03 das-intensity

Which start method are you using?

I've never used it in this fashion, only creating new Process instances.

The proposed solution makes no sense to me. Do tests pass?

jruere avatar Mar 10 '21 14:03 jruere

It's using fork. You can manually force it by changing the pool creation like so:

pool = Pool(2, context=multiprocessing.get_context('fork'))

As for the "proposed solution", I'm not proposing it as a solution, just noting an oddity in case it help others debug this

das-intensity avatar Mar 10 '21 16:03 das-intensity

@das-intensity Thanks a lot! Indeed you saved me quite some headaches.

After googling a bit I think this explains why the deadlock happens: https://pythonspeed.com/articles/python-multiprocessing/

It's not safe to fork a multithreaded process.

I think your "proposed solution" sounds reasonable.

heiner0815 avatar Jun 05 '21 11:06 heiner0815

@das-intensity Thanks a lot! Indeed you saved me quite some headaches.

After googling a bit I think this explains why the deadlock happens: https://pythonspeed.com/articles/python-multiprocessing/

It's not safe to fork a multithreaded process.

I think your "proposed solution" sounds reasonable.

According to https://pythonspeed.com/articles/python-multiprocessing/ It's solved:

from multiprocessing import get_context
from multiprocessing import Pool
import logging
import sys
import time
from os import getpid

from multiprocessing_logging import install_mp_handler


logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
install_mp_handler()

logger = logging.getLogger()


def func():
    time.sleep(0.50)

if __name__ == '__main__':

    while True:
        logger.info('a message')

        pool = get_context("spawn").Pool(10)
        # pool = Pool(2)

        for i in range(2):
            pool.apply_async(func)

        pool.close()
        pool.join()

dataaug avatar Dec 06 '21 12:12 dataaug

@dataaug the issue is with fork, the spawn functionality has always worked. The root of the issue is, as @heiner0815 said, that you should not fork a multithreaded process. Doing a spawn is a different thing which works differently. If you change your

pool = get_context("spawn").Pool(10)

line to

pool = get_context("fork").Pool(10)

you'll see the issue still exists.

I think there's 3 potential solutions:

  1. Switch the _receive_thread to a process like I showed (again, i don't necessarily want to say I suggest this as I'm not sure of knock-on effects). Maybe there could be an option for it, so that people using fork will get a process whereas with spawn they get a thread. NOTE: If I recall, the tests failed when I switched _receive_thread to a process, but that was a while ago.
  2. Mark the library as not safe to use with fork
  3. Someone who's an expert on the subject come in and tell us all what the correct solution is ;)

das-intensity avatar Dec 09 '21 15:12 das-intensity

@jruere Seconding @das-intensity that it would be nice to either indicate in the documentation that you have to do multiprocessing.set_start_method("spawn") for this module to work reliably or to fix the module to not have the deadlock, otherwise it is easy to waste time on this. I think dedicating a process (rather than a thread) to writing out the log messages could be a good fix.

The deadlock occurs like this:

When self.sub_handler.emit(record) is called from the receiving thread, it flushes the stderr buffer: https://github.com/python/cpython/blob/main/Lib/logging/init.py#L1164

The flush involves taking a lock in the Python interpreter: https://github.com/python/cpython/blob/main/Modules/_io/bufferedio.c#L811 https://github.com/python/cpython/blob/main/Modules/_io/bufferedio.c#L268

The fork()ing of a new process can happen while the lock is taken, which causes the lock to also be in "acquired" state in the child process, but the receiving thread is gone in the child so there is nothing to unlock the lock. multiprocessing.Process then tries to acquire the same lock when cleaning up the forked process, which will cause the child process to stay eternally stuck in the cleanup code: https://github.com/python/cpython/blob/main/Lib/multiprocessing/process.py#L334

spawn fixes this because the spawned process is not a copy of the parent, the python interpreter is executed again.

jaroslawr avatar Jul 05 '22 09:07 jaroslawr

The approach taken by this module only works with "fork". "Spawn" completely sidesteps the functionality of this module.

I am aware of the problem of threading locks and fork. This problem cannot be addressed without changing the approach of the module.

jruere avatar Dec 19 '22 17:12 jruere

I've added an explanation of the problem on the README.

There's a way to implement this functionality supporting "spawn" and "forkserver" start methods using multiprocessing queues for communication. In that implementation there would be a process which would handle gathering the logs and dealing with them. It would be easier to implement in Python 3 without also supporting Python 2.7 because of QueueListener.

This other approach has some nice properties such as allowing the logging process to live longer than the parent in order to finish handling the logs. Also, there would be more multiprocessing in general.

I don't know what other problems will pop-up. Surely there will be something.

I don't plan to create that module. It would be a nice little module though. :)

jruere avatar Dec 20 '22 06:12 jruere