distributed icon indicating copy to clipboard operation
distributed copied to clipboard

dask unstable; multiple race condition errors: P2PConsistencyError, RuntimeError, FutureCancelledError ...

Open lbesnard opened this issue 1 month ago • 3 comments

Hi,

I'm encountering repeated failures during xarray.Dataset.to_zarr() when using Dask Distributed. The failures occur during the Dask shuffle/rechunk phase and appear to be caused by instability in the P2P shuffle as well as many other dask issues.

While processing a batch of NetCDF to zarr, I end up having to create a horrible code which has to catch all possible dask issues, so that, the cluster/client gets destroyed, recreated, and the failed batch of files reprocessed. Even though recreating the cluster allows the batch to succeed on retry, it shouldn't be required.

Below are all the dask errors I have to catch in my code.

   
    batch_is_processed = False
    while not batch_is_processed: 
         try:
                with self.lock:
                    ds.to_zarr(
                        self.store,
                        mode="w",  # Overwrite mode for the first batch
                        write_empty_chunks=self.write_empty_chunks,
                        compute=True,  # Compute the result immediately
                        consolidated=self.consolidated,
                        safe_chunks=self.safe_chunks,
                        align_chunks=self.align_chunks,
                    )
                    batch_is_processed = True
             except (FutureCancelledError, P2PConsistencyError, RuntimeError) as e:
                    error_text = str(e)

                    SHUFFLE_KEYWORDS = [
                        "P2P",
                        "failed during transfer phase",
                        "failed during barrier phase",
                        "failed during shuffle phase",
                        "shuffle failure",
                        "No active shuffle",
                        "Unexpected error encountered during P2P",
                    ]

                    CONNECTION_KEYWORDS = [
                        "Timed out trying to connect",
                        "CancelledError",
                        "Too many open files",
                    ]

                    DESERIALISATION_KEYWORDS = [
                        "Error during deserialization",
                        "different environments",
                    ]

                    # Determine if the error should trigger a retry (cluster reset)
                    retryable = any(
                        keyword in error_text
                        for keyword in (
                            SHUFFLE_KEYWORDS
                            + CONNECTION_KEYWORDS
                            + DESERIALISATION_KEYWORDS
                        )
                    )

                    # RuntimeError that is NOT retryable -> treat as normal exception
                    if isinstance(e, RuntimeError) and not retryable:
                        # Treat as a regular exception: fallback to individual processing
                        # code removed for simplification
                    else:
                            self._reset_cluster()
                            batch_is_processed = False
                            # ... the rest of the logic is in a while loop

I'd like to highlight that my workers/schedulers are only using 50% of cpu/mem max. I've tried many different settings, and ended up having to create workers only with 1 thread if I don't want to run into more race conditions.

Errors Observed

P2PConsistencyError: No active shuffle with id='868df3e600f2968e0cd678a103d355d2' found
RuntimeError: P2P 43b3a653ad1c988df817f013d7314141 failed during transfer phase
OSError: Timed out trying to connect to tls://<worker-ip>:8786 after 30 s
asyncio.exceptions.CancelledError

...

Minimal Complete Verifiable Example:

# Put your MCVE code here

Anything else we need to know?:

Environment:

  • Dask version: 2025.11.0
  • Python version:
  • Operating System:
  • Install method (conda, pip, source):

lbesnard avatar Dec 10 '25 05:12 lbesnard

Sorry you're having a tough time. We did just merge some zarr improvements today which might help with a few things, this should be released next week.

To look into this further we will need a reproducible example so that we can see these problems ourselves. It seems like you're seeing a variety of different problems on your system, so it might be best to break each one down into a separate issue with it's own reproducer.

cc @dcherian @d-v-b @melonora

jacobtomlinson avatar Dec 10 '25 11:12 jacobtomlinson

also might be good to know which zarr version you are using

melonora avatar Dec 10 '25 11:12 melonora

thanks @melonora

I've pinned a few packages to older versions as I experience a lot of regressions with packages such as xarray. Happy to try to update my zarr package and see

Package Version Comment
zarr 2.18.7
xarray custom (June 2025) Using custom version based on June 2025 due to regression (see issue)
s3fs 2025.5.1
h5netcdf 1.7.3
coiled 1.129.3
dask 2025.11.0
dask-cloudprovider 2025.9.0
distributed 2025.11.0
dask-expr 2.0.0
netCDF4==1.6.5 1.6.5 bug https://github.com/Unidata/netcdf4-python/issues/1342

@jacobtomlinson , thanks, for background, I'm trying to convert all publicly available oceanographic data in Australia with this library I've been developing (https://github.com/aodn/aodn_cloud_optimised/) ! Using the full stack of libraries mentioned above has been challenging. I'd love to re-created a MVCE, but I don't even see where I can start when the behaviour is random. As mentioned, If I destroy a cluster when there is a bug, re-create it, and process the same batch of files, this batch works; and the cluster works fine until the next failure. It could be link to the NetCDF4 files I'm using which are moderately big (50-100mb) but I doubt.

lbesnard avatar Dec 11 '25 06:12 lbesnard

@jacobtomlinson

I did further testing with the latest version of zarr 3.1.5, and i now get errors I've never encountered before with open_zarr

with xr.open_zarr(                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               ^^^^^^^^^^^^^                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        File "/home/ubuntu/miniforge3/envs/AodnCloudOptimised/lib/python3.12/site-packages/xarray/backends/zarr.py", line 1505, in open_zarr                                                                                                                                                                                                                                                                                                                                                                                                                                                          ds = open_dataset(                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               ^^^^^^^^^^^^^                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        File "/home/ubuntu/miniforge3/envs/AodnCloudOptimised/lib/python3.12/site-packages/xarray/backends/api.py", line 687, in open_dataset                                                                                                                                                                                                                                                                                                                                                                                                                                                         backend_ds = backend.open_dataset(                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       ^^^^^^^^^^^^^^^^^^^^^                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        File "/home/ubuntu/miniforge3/envs/AodnCloudOptimised/lib/python3.12/site-packages/xarray/backends/zarr.py", line 1578, in open_dataset                                                                                                                                                                                                                                                                                                                                                                                                                                                       store = ZarrStore.open_group(                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       ^^^^^^^^^^^^^^^^^^^^^                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             File "/home/ubuntu/miniforge3/envs/AodnCloudOptimised/lib/python3.12/site-packages/xarray/backends/zarr.py", line 664, in open_group                                                                                                                                                                                                                                                                                                                                                                                                                                                          ) = _get_open_params(                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           ^^^^^^^^^^^^^^^^^                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     File "/home/ubuntu/miniforge3/envs/AodnCloudOptimised/lib/python3.12/site-packages/xarray/backends/zarr.py", line 1777, in _get_open_params                                                                                                                                                                                                                                                                                                                                                                                                                                                   zarr_root_group = zarr.open_consolidated(store, **open_kwargs)                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            File "/home/ubuntu/miniforge3/envs/AodnCloudOptimised/lib/python3.12/site-packages/zarr/api/synchronous.py", line 238, in open_consolidated                                                                                                                                                                                                                                                                                                                                                                                                                                                   sync(async_api.open_consolidated(*args, use_consolidated=use_consolidated, **kwargs))                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     File "/home/ubuntu/miniforge3/envs/AodnCloudOptimised/lib/python3.12/site-packages/zarr/core/sync.py", line 159, in sync                                                                                                                                                                                                                                                                                                                                                                                                                                                                      raise return_result                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       File "/home/ubuntu/miniforge3/envs/AodnCloudOptimised/lib/python3.12/site-packages/zarr/core/sync.py", line 119, in _runner                                                                                                                                                                                                                                                                                                                                                                                                                                                                   return await coro                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  ^^^^^^^^^^                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         File "/home/ubuntu/miniforge3/envs/AodnCloudOptimised/lib/python3.12/site-packages/zarr/api/asynchronous.py", line 415, in open_consolidated                                                                                                                                                                                                                                                                                                                                                                                                                                                  return await open_group(*args, use_consolidated=use_consolidated, **kwargs)                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               File "/home/ubuntu/miniforge3/envs/AodnCloudOptimised/lib/python3.12/site-packages/zarr/api/asynchronous.py", line 860, in open_group                                                                                                                                                                                                                                                                                                                                                                                                                                                         store_path = await make_store_path(store, mode=mode, storage_options=storage_options, path=path)                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          File "/home/ubuntu/miniforge3/envs/AodnCloudOptimised/lib/python3.12/site-packages/zarr/storage/_common.py", line 422, in make_store_path                                                                                                                                                                                                                                                                                                                                                                                                                                                     store = await make_store(store_like, mode=mode, storage_options=storage_options)                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          File "/home/ubuntu/miniforge3/envs/AodnCloudOptimised/lib/python3.12/site-packages/zarr/storage/_common.py", line 354, in make_store                                                                                                                                                                                                                                                                                                                                                                                                                                                          return FsspecStore.from_mapper(store_like, read_only=_read_only)                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          File "/home/ubuntu/miniforge3/envs/AodnCloudOptimised/lib/python3.12/site-packages/zarr/storage/_fsspec.py", line 197, in from_mapper                                                                                                                                                                                                                                                                                                                                                                                                                                                         fs = _make_async(fs_map.fs)                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      ^^^^^^^^^^^^^^^^^^^^^^                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               File "/home/ubuntu/miniforge3/envs/AodnCloudOptimised/lib/python3.12/site-packages/zarr/storage/_fsspec.py", line 55, in _make_async                                                                                                                                                                                                                                                                                                                                                                                                                                                          fs_dict = json.loads(fs.to_json())                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               ^^^^^^^^^^^^                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         File "/home/ubuntu/miniforge3/envs/AodnCloudOptimised/lib/python3.12/site-packages/fsspec/spec.py", line 1459, in to_json                                                                                                                                                                                                                                                                                                                                                                                                                                                                    return json.dumps(                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 ^^^^^^^^^^^                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        File "/home/ubuntu/miniforge3/envs/AodnCloudOptimised/lib/python3.12/json/__init__.py", line 238, in dumps                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    **kw).encode(obj)                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 ^^^^^^^^^^^                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         File "/home/ubuntu/miniforge3/envs/AodnCloudOptimised/lib/python3.12/json/encoder.py", line 200, in encode                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    chunks = self.iterencode(o, _one_shot=True)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ubuntu/miniforge3/envs/AodnCloudOptimised/lib/python3.12/json/encoder.py", line 258, in iterencode
    return _iterencode(o, 0)
           ^^^^^^^^^^^^^^^^^
  File "/home/ubuntu/miniforge3/envs/AodnCloudOptimised/lib/python3.12/site-packages/fsspec/json.py", line 16, in default
    return o.to_dict(include_password=self.include_password)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ubuntu/miniforge3/envs/AodnCloudOptimised/lib/python3.12/site-packages/fsspec/spec.py", line 1530, in to_dict
    **json_encoder.make_serializable(storage_options),
      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ubuntu/miniforge3/envs/AodnCloudOptimised/lib/python3.12/site-packages/fsspec/json.py", line 32, in make_serializable
    return {k: self.make_serializable(v) for k, v in obj.items()}
               ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ubuntu/miniforge3/envs/AodnCloudOptimised/lib/python3.12/site-packages/fsspec/json.py", line 36, in make_serializable
    return self.default(obj)
           ^^^^^^^^^^^^^^^^^
  File "/home/ubuntu/miniforge3/envs/AodnCloudOptimised/lib/python3.12/site-packages/fsspec/json.py", line 21, in default
    return super().default(o)
           ^^^^^^^^^^^^^^^^^^
  File "/home/ubuntu/miniforge3/envs/AodnCloudOptimised/lib/python3.12/json/encoder.py", line 180, in default
    raise TypeError(f'Object of type {o.__class__.__name__} '
TypeError: Object of type NoneType is not JSON serializable

Reverting to using zarr == 2.18.7 removes this issue.

Unfortunately, this is a common pattern I have with the dask/xarray/s3fs... ecosystem where I have to pin library versions and stay stuck because of common regressions.

lbesnard avatar Dec 17 '25 05:12 lbesnard

Could you please provide the code that causes this error? Just wondering what store is being used at the moment and also to see if I can try replicate it myself locally.

melonora avatar Dec 17 '25 11:12 melonora