streamz
streamz copied to clipboard
Script does not finish with dask distributed
I'm using a pipeline that reads text from file via Apache Tika, performs some pre-processing and writes it into a MongoDB. The following is a truncated version of my script.
if __name__ == "__main__":
mongo_client = MongoClient("mongodb://localhost:27017/")
dask_client = dask.distributed.Client()
file_stream_source = Stream()
file_stream = (
file_stream_source.scatter()
.map(add_filesize)
.map(add_text)
.map(add_text_lengths)
.buffer(16)
.gather()
)
file_stream.sink(write_file)
# file_stream_source emit loop
Everything works well, but the last few documents are missing. It seems like the dask process is killed before the task has finished. The resulting warnings/errors below support this. Is this behavior expected and I'm using the interface wrong or is this a bug?
Update: This does not happen when used in a jupyter notebook. Could this be related to the event loop?
distributed.process - WARNING - reaping stray process <ForkServerProcess(ForkServerProcess-2, started daemon)>
distributed.process - WARNING - reaping stray process <ForkServerProcess(ForkServerProcess-1, started daemon)>
distributed.process - WARNING - reaping stray process <ForkServerProcess(ForkServerProcess-3, started daemon)>
distributed.process - WARNING - reaping stray process <ForkServerProcess(ForkServerProcess-4, started daemon)>
distributed.nanny - WARNING - Worker process 15143 was killed by signal 15
distributed.nanny - WARNING - Worker process 15141 was killed by signal 15
Traceback (most recent call last):
File "/home/dario/anaconda3/lib/python3.7/multiprocessing/queues.py", line 242, in _feed
send_bytes(obj)
File "/home/dario/anaconda3/lib/python3.7/multiprocessing/connection.py", line 200, in send_bytes
self._send_bytes(m[offset:offset + size])
File "/home/dario/anaconda3/lib/python3.7/multiprocessing/connection.py", line 404, in _send_bytes
self._send(header + buf)
File "/home/dario/anaconda3/lib/python3.7/multiprocessing/connection.py", line 368, in _send
n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
Traceback (most recent call last):
File "/home/dario/anaconda3/lib/python3.7/multiprocessing/queues.py", line 242, in _feed
send_bytes(obj)
File "/home/dario/anaconda3/lib/python3.7/multiprocessing/connection.py", line 200, in send_bytes
self._send_bytes(m[offset:offset + size])
File "/home/dario/anaconda3/lib/python3.7/multiprocessing/connection.py", line 404, in _send_bytes
self._send(header + buf)
File "/home/dario/anaconda3/lib/python3.7/multiprocessing/connection.py", line 368, in _send
n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
distributed.nanny - WARNING - Worker process 15139 was killed by signal 15
distributed.nanny - WARNING - Worker process 15145 was killed by signal 15
relevant package versions
streamz 0.5.1 py_0 conda-forge
dask 1.2.2 py_0
dask-core 1.2.2 py_0
tornado 6.0.2 py37h7b6447c_0
Could it be that your main process is ending immediately after you send the last file for processing, so that the cluster gets shut down at that point and remaining processing tasks in flight are lost? In that case, you need your script to wait until all processing is done, perhaps by awaiting on the futures coming back from the pipeline, or sleeping, or polling the cluster to see if it's still working.
That is probably the issue I'm facing.
In that case, you need your script to wait until all processing is done, perhaps by awaiting on the futures coming back from the pipeline
How would I do this? I hope that doesn't sound demanding, but I guess that is what I expected the default behavior to be.
Start emitting to the source inside of a daemon thread. On the main have a long-running while loop hold the program up till conclusion. Example:
source = Source()
source.sink(print)
def start_emitting(s):
for i in range(1000):
s.emit(i)
if __name__ == "__main__":
activated_thread = Thread(daemon=True, target=start_emitting, args=(source,))
activated_thread.start()
while True:
sleep(5)
@kivo360 : your situation does not seem to involve distributed at all, perhaps a different issue? Please also provide the way in which you are stalling the main thread.
In any case, could it be that simply the print
is being blocked? If you replace
source.sink(print)
->
l = source.sink_to_list()
I believe the list will be populated with values.
Nah, it is involved with distributed. You need to leave the program time to finish. Often times I default to giving the program infinite time by keeping the main process open for extended periods of time.
What I mean is, your code doesn't invoke distributed. But I now understand that you were providing a solution, not a new issue :)
You should be able to achieve something similar with event loops, but your way may be simpler when none of the source nodes need an event loop anyway (but distributed always has one!). There may perhaps be a way say "run until done" on a source (i.e., stop when all of the events have been processed), which in the case with no timing of backpressure would be immediately.
Does emit
report the dask futures? Can those be awaited?
In the original case of working with distributed, yes, you can wait on futures from emit to be processed, but not in the simpler case.
In the simpler case can the thread be joined
? Does that thread respect backpressure?