streamz icon indicating copy to clipboard operation
streamz copied to clipboard

Script does not finish with dask distributed

Open drrmmng opened this issue 5 years ago • 9 comments

I'm using a pipeline that reads text from file via Apache Tika, performs some pre-processing and writes it into a MongoDB. The following is a truncated version of my script.

if __name__ == "__main__":
    mongo_client = MongoClient("mongodb://localhost:27017/")
    dask_client = dask.distributed.Client()
    file_stream_source = Stream()

    file_stream = (
        file_stream_source.scatter()
        .map(add_filesize)
        .map(add_text)
        .map(add_text_lengths)
        .buffer(16)
        .gather()
    )

    file_stream.sink(write_file)

    # file_stream_source emit loop

Everything works well, but the last few documents are missing. It seems like the dask process is killed before the task has finished. The resulting warnings/errors below support this. Is this behavior expected and I'm using the interface wrong or is this a bug?

Update: This does not happen when used in a jupyter notebook. Could this be related to the event loop?

distributed.process - WARNING - reaping stray process <ForkServerProcess(ForkServerProcess-2, started daemon)>
distributed.process - WARNING - reaping stray process <ForkServerProcess(ForkServerProcess-1, started daemon)>
distributed.process - WARNING - reaping stray process <ForkServerProcess(ForkServerProcess-3, started daemon)>
distributed.process - WARNING - reaping stray process <ForkServerProcess(ForkServerProcess-4, started daemon)>
distributed.nanny - WARNING - Worker process 15143 was killed by signal 15
distributed.nanny - WARNING - Worker process 15141 was killed by signal 15
Traceback (most recent call last):
  File "/home/dario/anaconda3/lib/python3.7/multiprocessing/queues.py", line 242, in _feed
    send_bytes(obj)
  File "/home/dario/anaconda3/lib/python3.7/multiprocessing/connection.py", line 200, in send_bytes
    self._send_bytes(m[offset:offset + size])
  File "/home/dario/anaconda3/lib/python3.7/multiprocessing/connection.py", line 404, in _send_bytes
    self._send(header + buf)
  File "/home/dario/anaconda3/lib/python3.7/multiprocessing/connection.py", line 368, in _send
    n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
Traceback (most recent call last):
  File "/home/dario/anaconda3/lib/python3.7/multiprocessing/queues.py", line 242, in _feed
    send_bytes(obj)
  File "/home/dario/anaconda3/lib/python3.7/multiprocessing/connection.py", line 200, in send_bytes
    self._send_bytes(m[offset:offset + size])
  File "/home/dario/anaconda3/lib/python3.7/multiprocessing/connection.py", line 404, in _send_bytes
    self._send(header + buf)
  File "/home/dario/anaconda3/lib/python3.7/multiprocessing/connection.py", line 368, in _send
    n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
distributed.nanny - WARNING - Worker process 15139 was killed by signal 15
distributed.nanny - WARNING - Worker process 15145 was killed by signal 15

relevant package versions

streamz                   0.5.1                      py_0    conda-forge
dask                      1.2.2                      py_0  
dask-core                 1.2.2                      py_0  
tornado                   6.0.2            py37h7b6447c_0 

drrmmng avatar Jun 11 '19 20:06 drrmmng

Could it be that your main process is ending immediately after you send the last file for processing, so that the cluster gets shut down at that point and remaining processing tasks in flight are lost? In that case, you need your script to wait until all processing is done, perhaps by awaiting on the futures coming back from the pipeline, or sleeping, or polling the cluster to see if it's still working.

martindurant avatar Jun 12 '19 13:06 martindurant

That is probably the issue I'm facing.

In that case, you need your script to wait until all processing is done, perhaps by awaiting on the futures coming back from the pipeline

How would I do this? I hope that doesn't sound demanding, but I guess that is what I expected the default behavior to be.

drrmmng avatar Jun 12 '19 17:06 drrmmng

Start emitting to the source inside of a daemon thread. On the main have a long-running while loop hold the program up till conclusion. Example:

source = Source()
source.sink(print)

def start_emitting(s):
    for i in range(1000):
       s.emit(i)

if __name__ == "__main__":
    activated_thread = Thread(daemon=True, target=start_emitting, args=(source,))
    activated_thread.start()
    while True:
        sleep(5)

kivo360 avatar Oct 06 '19 21:10 kivo360

@kivo360 : your situation does not seem to involve distributed at all, perhaps a different issue? Please also provide the way in which you are stalling the main thread.

In any case, could it be that simply the print is being blocked? If you replace

source.sink(print)
->
l = source.sink_to_list()

I believe the list will be populated with values.

martindurant avatar Oct 07 '19 12:10 martindurant

Nah, it is involved with distributed. You need to leave the program time to finish. Often times I default to giving the program infinite time by keeping the main process open for extended periods of time.

kivo360 avatar Oct 09 '19 04:10 kivo360

What I mean is, your code doesn't invoke distributed. But I now understand that you were providing a solution, not a new issue :)

You should be able to achieve something similar with event loops, but your way may be simpler when none of the source nodes need an event loop anyway (but distributed always has one!). There may perhaps be a way say "run until done" on a source (i.e., stop when all of the events have been processed), which in the case with no timing of backpressure would be immediately.

martindurant avatar Oct 09 '19 14:10 martindurant

Does emit report the dask futures? Can those be awaited?

CJ-Wright avatar Oct 09 '19 15:10 CJ-Wright

In the original case of working with distributed, yes, you can wait on futures from emit to be processed, but not in the simpler case.

martindurant avatar Oct 09 '19 15:10 martindurant

In the simpler case can the thread be joined? Does that thread respect backpressure?

CJ-Wright avatar Oct 17 '19 18:10 CJ-Wright