NeMo-Curator
NeMo-Curator copied to clipboard
Fix noisy Dask shutdown
When scripts finish successfully, there are Dask "errors" that appear in proportion to the number of workers.
Writing to disk complete for 3 partitions
2024-03-20 10:31:01,593 - distributed.worker - ERROR - Failed to communicate with scheduler during heartbeat.
Traceback (most recent call last):
File "/usr/local/lib/python3.10/dist-packages/distributed/comm/tcp.py", line 224, in read
frames_nbytes = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.10/dist-packages/distributed/worker.py", line 1253, in heartbeat
response = await retry_operation(
File "/usr/local/lib/python3.10/dist-packages/distributed/utils_comm.py", line 454, in retry_operation
return await retry(
File "/usr/local/lib/python3.10/dist-packages/distributed/utils_comm.py", line 433, in retry
return await coro()
File "/usr/local/lib/python3.10/dist-packages/distributed/core.py", line 1347, in send_recv_from_rpc
return await send_recv(comm=comm, op=key, **kwargs)
File "/usr/local/lib/python3.10/dist-packages/distributed/core.py", line 1106, in send_recv
response = await comm.read(deserializers=deserializers)
File "/usr/local/lib/python3.10/dist-packages/distributed/comm/tcp.py", line 240, in read
convert_stream_closed_error(self, e)
File "/usr/local/lib/python3.10/dist-packages/distributed/comm/tcp.py", line 143, in convert_stream_closed_error
raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) ConnectionPool.heartbeat_worker local=tcp://127.0.0.1:39070 remote=tcp://127.0.0.1:35637>: Stream is closed
2024-03-20 10:31:01,637 - distributed.worker - ERROR - Failed to communicate with scheduler during heartbeat.
Traceback (most recent call last):
File "/usr/local/lib/python3.10/dist-packages/distributed/comm/tcp.py", line 224, in read
frames_nbytes = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.10/dist-packages/distributed/worker.py", line 1253, in heartbeat
response = await retry_operation(
File "/usr/local/lib/python3.10/dist-packages/distributed/utils_comm.py", line 454, in retry_operation
return await retry(
File "/usr/local/lib/python3.10/dist-packages/distributed/utils_comm.py", line 433, in retry
return await coro()
File "/usr/local/lib/python3.10/dist-packages/distributed/core.py", line 1347, in send_recv_from_rpc
return await send_recv(comm=comm, op=key, **kwargs)
File "/usr/local/lib/python3.10/dist-packages/distributed/core.py", line 1106, in send_recv
response = await comm.read(deserializers=deserializers)
File "/usr/local/lib/python3.10/dist-packages/distributed/comm/tcp.py", line 240, in read
convert_stream_closed_error(self, e)
File "/usr/local/lib/python3.10/dist-packages/distributed/comm/tcp.py", line 143, in convert_stream_closed_error
raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) ConnectionPool.heartbeat_worker local=tcp://127.0.0.1:39082 remote=tcp://127.0.0.1:35637>: Stream is closed
2024-03-20 10:31:01,779 - distributed.worker - ERROR - Failed to communicate with scheduler during heartbeat.
Traceback (most recent call last):
File "/usr/local/lib/python3.10/dist-packages/distributed/comm/tcp.py", line 224, in read
frames_nbytes = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed
We should find a way to reduce/remove these errors. It might be an underlying issue with Dask, in which case we should make a minimum reproducible example.
Is there any progress.
No progress yet. The behavior seems random on the script so making a reproducible example has been difficult. I can take a closer look in the coming weeks, or if the community has any ideas I'd be interested in hearing them.
CC: @ayushdg , Any input on this.
No longer relevant after Ray refactor.