mpipe icon indicating copy to clipboard operation
mpipe copied to clipboard

Please explain if/how one can avoid putting too much data into a pipeline

Open johann-petrak opened this issue 6 years ago • 4 comments

My understanding is that whenever pipe.put(something) is executed it is put into a queue or similar. If we only want to process the somethings by workers, without caring at all about any return value, we can use disable_result but it seems there is no limit to how much data can get put into the pipeline. If a large number of large data is put into the pipeline, will this cause problems? Is it possible the have only a certain maximum number of items waiting for processing before put(something) blocks?

johann-petrak avatar Oct 30 '18 20:10 johann-petrak

You're right: when you put data on the pipeline, it's either put on a multiprocessing.Queue in case of an UnorderedStage, or sent to a multiprocessing.Pipe if using an OrderedStage. Both of those inter-process communications (IPCs) use sockets. Socket buffers are usually limited by system memory.

Each task occupies socket buffer space until it is taken off by the worker process. When you use up all the buffer space, the next put should block. And you will likely start having other out-of-memory problems, like program crashing. There is an example in the documentation that hopefully illustrates this.

Unfortunately there's no safety built into MPipe, so it's up to the user to monitor and manage how much they're feeding the pipeline.

vmlaker avatar Nov 01 '18 15:11 vmlaker

Thank you for this answer! As far as I understand multiprocessing.Queue has a maxsize parameter, so would it not possible to use this to easily limit the maximum data that can be put into the Queue before it blocks? But unfortunately the API does not seem to allow to pass this on to when the Queue instance is created...

If the above is not possible for some reason, how exactly would it possible to actually monitor on manage how much gets fed into the pipeline? For this it would be necessary to keep track of how much has put in already versus how much has been processed. Are these numbers available easily somewhere?

johann-petrak avatar Nov 01 '18 16:11 johann-petrak

The maxsize parameter is not implemented. This is a good idea; I'm noting this a requested enhancement.

Also, there is no facility to retrieve total count/size of tasks submitted and processed. I'm noting this as a requested feature for a future release. For now the user would have to do some kind of bookkeeping. Counting submitted tasks is obvious since it's under user control. As for counting how many tasks have been processed, something like this might work:

from threading import Thread

class Counter:
    def __init__(self, pipe):
        self._pipe = pipe
        self._count = 0
        self._thread = Thread(target=self._get_results)
        self._thread.start()
        
    def get_count(self):
        return self._count

    def _get_results(self):
        while True:
            self._pipe.get()
            self._count += 1

I haven't fully tested this, but the basic concept is to have a background thread continuously retrieving results, and bumping the count. But make sure your stages have disable_result=False.

vmlaker avatar Nov 01 '18 18:11 vmlaker

Thank you so much, also for accepting this as a feature request!

johann-petrak avatar Nov 01 '18 19:11 johann-petrak