dask unstable; multiple race condition errors: P2PConsistencyError, RuntimeError, FutureCancelledError ...
Hi,
I'm encountering repeated failures during xarray.Dataset.to_zarr() when using Dask Distributed.
The failures occur during the Dask shuffle/rechunk phase and appear to be caused by instability in the P2P shuffle as well as many other dask issues.
While processing a batch of NetCDF to zarr, I end up having to create a horrible code which has to catch all possible dask issues, so that, the cluster/client gets destroyed, recreated, and the failed batch of files reprocessed. Even though recreating the cluster allows the batch to succeed on retry, it shouldn't be required.
Below are all the dask errors I have to catch in my code.
batch_is_processed = False
while not batch_is_processed:
try:
with self.lock:
ds.to_zarr(
self.store,
mode="w", # Overwrite mode for the first batch
write_empty_chunks=self.write_empty_chunks,
compute=True, # Compute the result immediately
consolidated=self.consolidated,
safe_chunks=self.safe_chunks,
align_chunks=self.align_chunks,
)
batch_is_processed = True
except (FutureCancelledError, P2PConsistencyError, RuntimeError) as e:
error_text = str(e)
SHUFFLE_KEYWORDS = [
"P2P",
"failed during transfer phase",
"failed during barrier phase",
"failed during shuffle phase",
"shuffle failure",
"No active shuffle",
"Unexpected error encountered during P2P",
]
CONNECTION_KEYWORDS = [
"Timed out trying to connect",
"CancelledError",
"Too many open files",
]
DESERIALISATION_KEYWORDS = [
"Error during deserialization",
"different environments",
]
# Determine if the error should trigger a retry (cluster reset)
retryable = any(
keyword in error_text
for keyword in (
SHUFFLE_KEYWORDS
+ CONNECTION_KEYWORDS
+ DESERIALISATION_KEYWORDS
)
)
# RuntimeError that is NOT retryable -> treat as normal exception
if isinstance(e, RuntimeError) and not retryable:
# Treat as a regular exception: fallback to individual processing
# code removed for simplification
else:
self._reset_cluster()
batch_is_processed = False
# ... the rest of the logic is in a while loop
I'd like to highlight that my workers/schedulers are only using 50% of cpu/mem max. I've tried many different settings, and ended up having to create workers only with 1 thread if I don't want to run into more race conditions.
Errors Observed
P2PConsistencyError: No active shuffle with id='868df3e600f2968e0cd678a103d355d2' found
RuntimeError: P2P 43b3a653ad1c988df817f013d7314141 failed during transfer phase
OSError: Timed out trying to connect to tls://<worker-ip>:8786 after 30 s
asyncio.exceptions.CancelledError
...
Minimal Complete Verifiable Example:
# Put your MCVE code here
Anything else we need to know?:
Environment:
- Dask version: 2025.11.0
- Python version:
- Operating System:
- Install method (conda, pip, source):
Sorry you're having a tough time. We did just merge some zarr improvements today which might help with a few things, this should be released next week.
To look into this further we will need a reproducible example so that we can see these problems ourselves. It seems like you're seeing a variety of different problems on your system, so it might be best to break each one down into a separate issue with it's own reproducer.
cc @dcherian @d-v-b @melonora
also might be good to know which zarr version you are using
thanks @melonora
I've pinned a few packages to older versions as I experience a lot of regressions with packages such as xarray. Happy to try to update my zarr package and see
| Package | Version | Comment |
|---|---|---|
| zarr | 2.18.7 | |
| xarray | custom (June 2025) | Using custom version based on June 2025 due to regression (see issue) |
| s3fs | 2025.5.1 | |
| h5netcdf | 1.7.3 | |
| coiled | 1.129.3 | |
| dask | 2025.11.0 | |
| dask-cloudprovider | 2025.9.0 | |
| distributed | 2025.11.0 | |
| dask-expr | 2.0.0 | |
| netCDF4==1.6.5 | 1.6.5 | bug https://github.com/Unidata/netcdf4-python/issues/1342 |
@jacobtomlinson , thanks, for background, I'm trying to convert all publicly available oceanographic data in Australia with this library I've been developing (https://github.com/aodn/aodn_cloud_optimised/) ! Using the full stack of libraries mentioned above has been challenging. I'd love to re-created a MVCE, but I don't even see where I can start when the behaviour is random. As mentioned, If I destroy a cluster when there is a bug, re-create it, and process the same batch of files, this batch works; and the cluster works fine until the next failure. It could be link to the NetCDF4 files I'm using which are moderately big (50-100mb) but I doubt.
@jacobtomlinson
I did further testing with the latest version of zarr 3.1.5, and i now get errors I've never encountered before with open_zarr
with xr.open_zarr( ^^^^^^^^^^^^^ File "/home/ubuntu/miniforge3/envs/AodnCloudOptimised/lib/python3.12/site-packages/xarray/backends/zarr.py", line 1505, in open_zarr ds = open_dataset( ^^^^^^^^^^^^^ File "/home/ubuntu/miniforge3/envs/AodnCloudOptimised/lib/python3.12/site-packages/xarray/backends/api.py", line 687, in open_dataset backend_ds = backend.open_dataset( ^^^^^^^^^^^^^^^^^^^^^ File "/home/ubuntu/miniforge3/envs/AodnCloudOptimised/lib/python3.12/site-packages/xarray/backends/zarr.py", line 1578, in open_dataset store = ZarrStore.open_group( ^^^^^^^^^^^^^^^^^^^^^ File "/home/ubuntu/miniforge3/envs/AodnCloudOptimised/lib/python3.12/site-packages/xarray/backends/zarr.py", line 664, in open_group ) = _get_open_params( ^^^^^^^^^^^^^^^^^ File "/home/ubuntu/miniforge3/envs/AodnCloudOptimised/lib/python3.12/site-packages/xarray/backends/zarr.py", line 1777, in _get_open_params zarr_root_group = zarr.open_consolidated(store, **open_kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/ubuntu/miniforge3/envs/AodnCloudOptimised/lib/python3.12/site-packages/zarr/api/synchronous.py", line 238, in open_consolidated sync(async_api.open_consolidated(*args, use_consolidated=use_consolidated, **kwargs)) File "/home/ubuntu/miniforge3/envs/AodnCloudOptimised/lib/python3.12/site-packages/zarr/core/sync.py", line 159, in sync raise return_result File "/home/ubuntu/miniforge3/envs/AodnCloudOptimised/lib/python3.12/site-packages/zarr/core/sync.py", line 119, in _runner return await coro ^^^^^^^^^^ File "/home/ubuntu/miniforge3/envs/AodnCloudOptimised/lib/python3.12/site-packages/zarr/api/asynchronous.py", line 415, in open_consolidated return await open_group(*args, use_consolidated=use_consolidated, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/ubuntu/miniforge3/envs/AodnCloudOptimised/lib/python3.12/site-packages/zarr/api/asynchronous.py", line 860, in open_group store_path = await make_store_path(store, mode=mode, storage_options=storage_options, path=path) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/ubuntu/miniforge3/envs/AodnCloudOptimised/lib/python3.12/site-packages/zarr/storage/_common.py", line 422, in make_store_path store = await make_store(store_like, mode=mode, storage_options=storage_options) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/ubuntu/miniforge3/envs/AodnCloudOptimised/lib/python3.12/site-packages/zarr/storage/_common.py", line 354, in make_store return FsspecStore.from_mapper(store_like, read_only=_read_only) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/ubuntu/miniforge3/envs/AodnCloudOptimised/lib/python3.12/site-packages/zarr/storage/_fsspec.py", line 197, in from_mapper fs = _make_async(fs_map.fs) ^^^^^^^^^^^^^^^^^^^^^^ File "/home/ubuntu/miniforge3/envs/AodnCloudOptimised/lib/python3.12/site-packages/zarr/storage/_fsspec.py", line 55, in _make_async fs_dict = json.loads(fs.to_json()) ^^^^^^^^^^^^ File "/home/ubuntu/miniforge3/envs/AodnCloudOptimised/lib/python3.12/site-packages/fsspec/spec.py", line 1459, in to_json return json.dumps( ^^^^^^^^^^^ File "/home/ubuntu/miniforge3/envs/AodnCloudOptimised/lib/python3.12/json/__init__.py", line 238, in dumps **kw).encode(obj) ^^^^^^^^^^^ File "/home/ubuntu/miniforge3/envs/AodnCloudOptimised/lib/python3.12/json/encoder.py", line 200, in encode chunks = self.iterencode(o, _one_shot=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ubuntu/miniforge3/envs/AodnCloudOptimised/lib/python3.12/json/encoder.py", line 258, in iterencode
return _iterencode(o, 0)
^^^^^^^^^^^^^^^^^
File "/home/ubuntu/miniforge3/envs/AodnCloudOptimised/lib/python3.12/site-packages/fsspec/json.py", line 16, in default
return o.to_dict(include_password=self.include_password)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ubuntu/miniforge3/envs/AodnCloudOptimised/lib/python3.12/site-packages/fsspec/spec.py", line 1530, in to_dict
**json_encoder.make_serializable(storage_options),
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ubuntu/miniforge3/envs/AodnCloudOptimised/lib/python3.12/site-packages/fsspec/json.py", line 32, in make_serializable
return {k: self.make_serializable(v) for k, v in obj.items()}
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ubuntu/miniforge3/envs/AodnCloudOptimised/lib/python3.12/site-packages/fsspec/json.py", line 36, in make_serializable
return self.default(obj)
^^^^^^^^^^^^^^^^^
File "/home/ubuntu/miniforge3/envs/AodnCloudOptimised/lib/python3.12/site-packages/fsspec/json.py", line 21, in default
return super().default(o)
^^^^^^^^^^^^^^^^^^
File "/home/ubuntu/miniforge3/envs/AodnCloudOptimised/lib/python3.12/json/encoder.py", line 180, in default
raise TypeError(f'Object of type {o.__class__.__name__} '
TypeError: Object of type NoneType is not JSON serializable
Reverting to using zarr == 2.18.7 removes this issue.
Unfortunately, this is a common pattern I have with the dask/xarray/s3fs... ecosystem where I have to pin library versions and stay stuck because of common regressions.
Could you please provide the code that causes this error? Just wondering what store is being used at the moment and also to see if I can try replicate it myself locally.