distributed
distributed copied to clipboard
Last few tasks stall while merging two large dataframes
What happened:
- read in two large dataframes via (both dfs are around 40Gb of parquet file data and we select a subset of the data so in memory is less)
COLUMNS = []
df1_path = 's3://some_path/df1'
df2_path = 's3://some_path/df2'
df1 = dd.read_parquet(df1_path,columns=COLUMNS)
df2 = dd.read_parquet(df2_path,columns=COLUMNS)
- set the index, global index column is sorted across all the parquet files before hand
df1 = df1.set_index("global_index", sorted=True)
df2 = df2.set_index("global_index", sorted=True)
- merge and persist df3 in memory
df3 = df2.merge(df1,suffixes=["_df1","_df2"] , left_index=True,right_index=True)
df3 = df3.persist()
- Dask dashboard just shows tasks stalled after a few of them are completed
no progress for over 30 mins (after which I just cancel the job)
workers seem idle -- no cpu usage (memory usage seems to be utilized, workers have a limit of 10Gb memory)

the graph page shows some tasks stuck "in-memory" at the set_index
stage ?

In the worker logs just see this about communication being lost while the workers are talking to each other (I'm assuming they are shuffling data for joins)
worker logs
distributed.worker - INFO - -------------------------------------------------
distributed.worker - ERROR - Worker stream died during communication: tls://123.12.123.123:37995 Traceback (most recent call last): File "internal_pip_dependency_tornado/pypi__tornado/tornado/iostream.py", line 971, in _handle_write num_bytes = self.write_to_fd(self._write_buffer.peek(size)) File "internal_pip_dependency_tornado/pypi__tornado/tornado/iostream.py", line 1568, in write_to_fd return self.socket.send(data) # type: ignore File "python3.6/usr/lib/python3.6/ssl.py", line 944, in send return self._sslobj.write(data) File "python3.6/usr/lib/python3.6/ssl.py", line 642, in write return self._sslobj.write(data) BrokenPipeError: [Errno 32] Broken pipe The above exception was the direct cause of the following exception: Traceback (most recent call last): File "distributed/worker.py", line 2064, in gather_dep self.rpc, deps, worker, who=self.address File "distributed/worker.py", line 3333, in get_data_from_worker return await retry_operation(_get_data, operation="get_data_from_worker") File "distributed/utils_comm.py", line 389, in retry_operation operation=operation, File "distributed/utils_comm.py", line 369, in retry return await coro() File "distributed/worker.py", line 3320, in _get_data max_connections=max_connections, File "distributed/core.py", line 644, in send_recv response = await comm.read(deserializers=deserializers) File "distributed/comm/tcp.py", line 205, in read convert_stream_closed_error(self, e) File "distributed/comm/tcp.py", line 126, in convert_stream_closed_error ) from exc distributed.comm.core.CommClosedError: in <closed TLS>: BrokenPipeError: [Errno 32] Broken pipe
distributed.worker - INFO - Can't find dependencies for key ('fix-overlap-0ca23fb934d93f7d6d24d7aba82d6b8e', 128)
distributed.worker - INFO - Dependent not found: ('set_index-893549a2ed1d56f483fd0c8b82d2e14c', 128) 0 . Asking scheduler
distributed.worker - ERROR - Handle missing dep failed, retrying Traceback (most recent call last): File "distributed/comm/core.py", line 288, in connect timeout=min(intermediate_cap, time_left()), File "python3.6/usr/lib/python3.6/asyncio/tasks.py", line 362, in wait_for raise futures.TimeoutError() concurrent.futures._base.TimeoutError The above exception was the direct cause of the following exception: Traceback (most recent call last): File "distributed/worker.py", line 2212, in handle_missing_dep self.scheduler.who_has, keys=list(dep.key for dep in deps) File "distributed/utils_comm.py", line 389, in retry_operation operation=operation, File "distributed/utils_comm.py", line 369, in retry return await coro() File "distributed/core.py", line 858, in send_recv_from_rpc comm = await self.pool.connect(self.addr) File "distributed/core.py", line 1013, in connect **self.connection_args, File "distributed/comm/core.py", line 310, in connect ) from active_exception OSError: Timed out trying to connect to tls://dask-8f0f8f8ffd3942fd8143478f1ffa95e2.daskgateway:8786 after 10 s
distributed.worker - INFO - Dependent not found: ('set_index-893549a2ed1d56f483fd0c8b82d2e14c', 128) 0 . Asking scheduler
distributed.worker - ERROR - Worker stream died during communication: tls://123.12.123.123:37995 Traceback (most recent call last): File "distributed/comm/core.py", line 319, in connect handshake = await asyncio.wait_for(comm.read(), time_left()) File "python3.6/usr/lib/python3.6/asyncio/tasks.py", line 362, in wait_for raise futures.TimeoutError() concurrent.futures._base.TimeoutError The above exception was the direct cause of the following exception: Traceback (most recent call last): File "distributed/worker.py", line 2064, in gather_dep self.rpc, deps, worker, who=self.address File "distributed/worker.py", line 3333, in get_data_from_worker return await retry_operation(_get_data, operation="get_data_from_worker") File "distributed/utils_comm.py", line 389, in retry_operation operation=operation, File "distributed/utils_comm.py", line 369, in retry return await coro() File "distributed/worker.py", line 3310, in _get_data comm = await rpc.connect(worker) File "distributed/core.py", line 1013, in connect **self.connection_args, File "distributed/comm/core.py", line 326, in connect ) from exc OSError: Timed out during handshake while connecting to tls://123.12.123.123:37995 after 10 s
distributed.worker - INFO - Can't find dependencies for key ('fix-overlap-0ca23fb934d93f7d6d24d7aba82d6b8e', 164)
distributed.worker - INFO - Dependent not found: ('set_index-893549a2ed1d56f483fd0c8b82d2e14c', 163) 0 . Asking scheduler
distributed.worker - ERROR - Worker stream died during communication: tls://123.12.123.123:42005 Traceback (most recent call last): File "distributed/comm/core.py", line 288, in connect timeout=min(intermediate_cap, time_left()), File "python3.6/usr/lib/python3.6/asyncio/tasks.py", line 362, in wait_for raise futures.TimeoutError() concurrent.futures._base.TimeoutError The above exception was the direct cause of the following exception: Traceback (most recent call last): File "distributed/worker.py", line 2064, in gather_dep self.rpc, deps, worker, who=self.address File "distributed/worker.py", line 3333, in get_data_from_worker return await retry_operation(_get_data, operation="get_data_from_worker") File "distributed/utils_comm.py", line 389, in retry_operation operation=operation, File "distributed/utils_comm.py", line 369, in retry return await coro() File "distributed/worker.py", line 3310, in _get_data comm = await rpc.connect(worker) File "distributed/core.py", line 1013, in connect **self.connection_args, File "distributed/comm/core.py", line 310, in connect ) from active_exception OSError: Timed out trying to connect to tls://123.12.123.123:42005 after 10 s
distributed.worker - INFO - Can't find dependencies for key ('join-indexed-rename-1b8b6d5e81d4af20b409b7152fd1f5e4', 112)
distributed.worker - ERROR - Worker stream died during communication: tls://123.12.123.123:39863 Traceback (most recent call last): File "distributed/comm/core.py", line 288, in connect timeout=min(intermediate_cap, time_left()), File "python3.6/usr/lib/python3.6/asyncio/tasks.py", line 362, in wait_for raise futures.TimeoutError() concurrent.futures._base.TimeoutError The above exception was the direct cause of the following exception: Traceback (most recent call last): File "distributed/worker.py", line 2064, in gather_dep self.rpc, deps, worker, who=self.address File "distributed/worker.py", line 3333, in get_data_from_worker return await retry_operation(_get_data, operation="get_data_from_worker") File "distributed/utils_comm.py", line 389, in retry_operation operation=operation, File "distributed/utils_comm.py", line 369, in retry return await coro() File "distributed/worker.py", line 3310, in _get_data comm = await rpc.connect(worker) File "distributed/core.py", line 1013, in connect **self.connection_args, File "distributed/comm/core.py", line 310, in connect ) from active_exception OSError: Timed out trying to connect to tls://123.12.123.123:39863 after 10 s
distributed.worker - INFO - Can't find dependencies for key ('join-indexed-rename-1b8b6d5e81d4af20b409b7152fd1f5e4', 262)
distributed.worker - INFO - Dependent not found: ('repartition-merge-771c2501dd21c164d0c6e29ee1493490', 112) 0 . Asking scheduler
distributed.worker - INFO - Dependent not found: ('repartition-merge-10c4905ff08d78501e2eaf92c56e1bf9', 262) 0 . Asking scheduler
distributed.worker - ERROR - Worker stream died during communication: tls://123.12.123.123:41695 Traceback (most recent call last): File "internal_pip_dependency_tornado/pypi__tornado/tornado/iostream.py", line 971, in _handle_write num_bytes = self.write_to_fd(self._write_buffer.peek(size)) File "internal_pip_dependency_tornado/pypi__tornado/tornado/iostream.py", line 1568, in write_to_fd return self.socket.send(data) # type: ignore File "python3.6/usr/lib/python3.6/ssl.py", line 944, in send return self._sslobj.write(data) File "python3.6/usr/lib/python3.6/ssl.py", line 642, in write return self._sslobj.write(data) BrokenPipeError: [Errno 32] Broken pipe The above exception was the direct cause of the following exception: Traceback (most recent call last): File "distributed/worker.py", line 2064, in gather_dep self.rpc, deps, worker, who=self.address File "distributed/worker.py", line 3333, in get_data_from_worker return await retry_operation(_get_data, operation="get_data_from_worker") File "distributed/utils_comm.py", line 389, in retry_operation operation=operation, File "distributed/utils_comm.py", line 369, in retry return await coro() File "distributed/worker.py", line 3320, in _get_data max_connections=max_connections, File "distributed/core.py", line 644, in send_recv response = await comm.read(deserializers=deserializers) File "distributed/comm/tcp.py", line 205, in read convert_stream_closed_error(self, e) File "distributed/comm/tcp.py", line 126, in convert_stream_closed_error ) from exc distributed.comm.core.CommClosedError: in <closed TLS>: BrokenPipeError: [Errno 32] Broken pipe
distributed.worker - INFO - Can't find dependencies for key ('join-indexed-rename-1b8b6d5e81d4af20b409b7152fd1f5e4', 358)
distributed.worker - INFO - Can't find dependencies for key ('join-indexed-rename-1b8b6d5e81d4af20b409b7152fd1f5e4', 361)
distributed.worker - INFO - Dependent not found: ('repartition-merge-10c4905ff08d78501e2eaf92c56e1bf9', 358) 0 . Asking scheduler
distributed.worker - INFO - Dependent not found: ('repartition-merge-10c4905ff08d78501e2eaf92c56e1bf9', 361) 0 . Asking scheduler
What you expected to happen:
This seems to be a fairly simple join operation of data not sure why the tasks are just stalling, I also tried not persisting the data and just doing len(df3)
and that too has the same result
Minimal Complete Verifiable Example:
Embedded in the description above
Anything else we need to know?:
I was using daskgateway to manage the cluster, I tried to use the approach in this video to launch subprocesses from the dask worker but I don't think that option is available in my dask version (I did try from a newer version but faced some issues as well but will leave that out)
Environment:
- Dask version: 2021.03.0
- Python version: 3.6
- Install method (conda, pip, source): pip
Cluster dump state seems to not be supported in this version
Cluster Dump State:
I did try with the latest dask version as well, 2022.05.2
and it again seemed to stall. Greatly appreciate any help with this. Looking at the worker logs the error was the same as the version above
@gjoseph92 does anything here sound familiar to you?
Hi @bkahloon
Looking at your worker logs shows a specific log message that was removed a long time ago
Specifically Handle missing dep failed, retrying
was removed in https://github.com/dask/distributed/pull/5046 / 2021.10.0
While the logs may look similar to you, I'm sure there are subtle differences and it would be helpful to have up to date logs. Can you please post the logs when running on 2022.05.2
?
It would also be ideal if you could provide a cluster dump
- Get your cluster in a state that is stuck
- Run https://distributed.dask.org/en/stable/api.html?highlight=dump_cluster_state#distributed.Client.dump_cluster_state
- Upload the json/msgpack file here such that we can investigate
Hi @fjetter , sorry for not adding the cluster dump with the new version attempt. I've attached it below dask.msgpack.gz
Hey. I am having a similar issue where I am trying to save the output as a NetCDF file using Xarray. The dask works perfectly fine till the last moment and then suddenly freezes. There is still ample memory in all the workers but they freeze and do not further process the tasks.
I am using Xarray version: 2022.3.0 Dask version: 2022.05.1 Distributed version: 2022.5.1
It doesn't show any error or anything just freezes over.
Thank you @bkahloon for the cluster dump you provided in https://github.com/dask/distributed/issues/6493#issuecomment-1146266521.
We can confirm that this is a deadlock and are not absolutely certain if it is fixed on the most recent version already. We could find out a couple of things, though. (The following is likely a bit cryptic to most users and is intended for documentation purposes mostly)
There appears to be some sort of network failure between worker and scheduler while the worker is trying to heartbeat. This pops up as an exception Timed out while trying to connect during heartbeat
. This is escalated by the worker by closing.
The interesting thing is that at this time the worker and scheduler already have an open connection which is unaffected (the batched stream).
During this close attempt, this worker is stuck for a reason we could not identify yet.
- https://github.com/dask/distributed/pull/6644 might be causing the event loop to block. This log should help us identify it
- We strongly suspect that something in our threadpool is off since we can see a (distributed) task being scheduled on a threadpool but it never actually finishes. This is a known problem with the shutdown of the executor that can cause a worker to lock up. However, we believe this should not be possible to block a
Worker.close
which is what is definitely happening - Something in our
comms
could block closing our ConnectionPool. this might be already fixed by https://github.com/dask/distributed/issues/6548. We can also see a couple ofTimed out during handshake
that pop up when this worker is trying to connect to a peer to fetch results which might support this theory. - It's not entirely clear why the scheduler does not remove the faulty worker. The config option
distributed.scheduler.worker-ttl
should remove this broken worker from the rotation and reschedule all stuck tasks, therefore self-healing. For some reason, this is not triggered and we need to investigate this.
Overall a lot happened lately to the code base and we might have fixed the problem already. For instance, https://github.com/dask/distributed/pull/6603 is helping out a lot with proper, graceful shutdown overall and we might've fixed this issue by chance already.
https://github.com/dask/distributed/pull/6644 is introducing an important log message for the threadpool case. once this is merged I would greatly appreciate a new run with a cluster dump if this is possible (Release starting 2022.07.0
)
For what it's worth, I encountered something that feels similar when working with masked arrays in dask array (dask/dask#9181). The issue was that dask lost the ability to deserialize masked arrays because something went wrong with the lazy registration of the deserializer. I think it's possible that a similar problem occurs with other objects that rely on lazy registration of deserializers.
I think I am being bitten by the same or similar. Oddly enough, the code works fine if I am running just a few test files (say 10 files), but gives me an error if I throw the whole enchilada at it (156,526 files). All in all I get the following warnigns and dump on he many files:
extracting CryoSat-2 granule start and end times.
100%|████████████████████████████████████████████████████████████████████| 156526/156526 [00:34<00:00, 4553.44it/s]
100%|█████████████████████████████████████████████████████████████████| 156526/156526 [00:00<00:00, 1045645.01it/s]
/cooler/Cryosat-2/miniforge3/envs/MB_new/lib/python3.11/site-packages/distributed/client.py:3162: UserWarning: Sending large graph of size 26.28 MiB.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.
warnings.warn(
2024-02-29 16:27:01,269 - distributed.batched - ERROR - Error in batched write
Traceback (most recent call last):
File "/cooler/Cryosat-2/miniforge3/envs/MB_new/lib/python3.11/site-packages/distributed/batched.py", line 115, in _background_send
nbytes = yield coro
^^^^^^^^^^
File "/cooler/Cryosat-2/miniforge3/envs/MB_new/lib/python3.11/site-packages/tornado/gen.py", line 767, in run
value = future.result()
^^^^^^^^^^^^^^^
File "/cooler/Cryosat-2/miniforge3/envs/MB_new/lib/python3.11/site-packages/distributed/comm/tcp.py", line 265, in write
frames = await to_frames(
^^^^^^^^^^^^^^^^
File "/cooler/Cryosat-2/miniforge3/envs/MB_new/lib/python3.11/site-packages/distributed/comm/utils.py", line 48, in to_frames
return await offload(_to_frames)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/cooler/Cryosat-2/miniforge3/envs/MB_new/lib/python3.11/site-packages/distributed/utils.py", line 1540, in run_in_executor_with_context
return await loop.run_in_executor(
^^^^^^^^^^^^^^^^^^^^^
File "/cooler/Cryosat-2/miniforge3/envs/MB_new/lib/python3.11/asyncio/base_events.py", line 829, in run_in_executor
executor.submit(func, *args), loop=self)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/cooler/Cryosat-2/miniforge3/envs/MB_new/lib/python3.11/concurrent/futures/thread.py", line 167, in submit
raise RuntimeError('cannot schedule new futures after shutdown')
RuntimeError: cannot schedule new futures after shutdown
2024-02-29 16:27:05,504 - distributed.nanny - WARNING - Worker process still alive after 3.1999989318847657 seconds, killing
2024-02-29 16:27:05,504 - distributed.nanny - WARNING - Worker process still alive after 3.1999995422363288 seconds, killing
2024-02-29 16:27:05,505 - distributed.nanny - WARNING - Worker process still alive after 3.1999992370605472 seconds, killing
2024-02-29 16:27:05,505 - distributed.nanny - WARNING - Worker process still alive after 3.1999995422363288 seconds, killing
This seams to happen in a number of my scripts, so I may be doing something systematically wrong, but in this case I am automating a check for updated files and then downloading them before processing:
lock = SerializableLock()
# set up the clusters
cluster = LocalCluster(n_workers=workers, threads_per_worker=1)
client = Client(cluster)
# generate the futures for Dask to process
pair_group = []
for CS_fname in tqdm.tqdm(CS_df['Filename'].values, disable=not verbose):
pair_group.append((lock,CS_fname, indir, error_file, verbose))
futures = client.map(download_CryoSat_file, pair_group)
if verbose:
progress(futures)
client.gather(futures)
I have tried passing the results to a variable, but all the futures are processed (and files downloaded and validity checked) and the details on their status is written to a log file (the reason for passing the lock), but it would be nice to figure out why this is happening. As a note, I have tried processing this asynchronously and fire_and_forget, but have not gotten it to work consistently.