mpipe
mpipe copied to clipboard
Please explain if/how one can avoid putting too much data into a pipeline
My understanding is that whenever pipe.put(something)
is executed it is put into a queue or similar.
If we only want to process the somethings by workers, without caring at all about any return value, we can use disable_result
but it seems there is no limit to how much data can get put into the pipeline. If a large number of large data is put into the pipeline, will this cause problems? Is it possible the have only a certain maximum number of items waiting for processing before put(something)
blocks?
You're right: when you put
data on the pipeline, it's either put on a multiprocessing.Queue
in case of an UnorderedStage, or sent to a multiprocessing.Pipe
if using an OrderedStage. Both of those inter-process communications (IPCs) use sockets. Socket buffers are usually limited by system memory.
Each task occupies socket buffer space until it is taken off by the worker process. When you use up all the buffer space, the next put
should block. And you will likely start having other out-of-memory problems, like program crashing. There is an example in the documentation that hopefully illustrates this.
Unfortunately there's no safety built into MPipe, so it's up to the user to monitor and manage how much they're feeding the pipeline.
Thank you for this answer! As far as I understand multiprocessing.Queue has a maxsize parameter, so would it not possible to use this to easily limit the maximum data that can be put into the Queue before it blocks? But unfortunately the API does not seem to allow to pass this on to when the Queue instance is created...
If the above is not possible for some reason, how exactly would it possible to actually monitor on manage how much gets fed into the pipeline? For this it would be necessary to keep track of how much has put in already versus how much has been processed. Are these numbers available easily somewhere?
The maxsize
parameter is not implemented. This is a good idea; I'm noting this a requested enhancement.
Also, there is no facility to retrieve total count/size of tasks submitted and processed. I'm noting this as a requested feature for a future release. For now the user would have to do some kind of bookkeeping. Counting submitted tasks is obvious since it's under user control. As for counting how many tasks have been processed, something like this might work:
from threading import Thread
class Counter:
def __init__(self, pipe):
self._pipe = pipe
self._count = 0
self._thread = Thread(target=self._get_results)
self._thread.start()
def get_count(self):
return self._count
def _get_results(self):
while True:
self._pipe.get()
self._count += 1
I haven't fully tested this, but the basic concept is to have a background thread continuously retrieving results, and bumping the count. But make sure your stages have disable_result=False
.
Thank you so much, also for accepting this as a feature request!