xarray icon indicating copy to clipboard operation
xarray copied to clipboard

Saving to zarr-store using shards changes underlying data

Open Holmgren825 opened this issue 6 months ago • 6 comments

What happened?

Hey! I tried saving a dataset (CMIP6 derived data) with very small chunks (dask array) to a zarr store using shards to limit the number of files used in the store. But, I noticed that computation based on the sharded store returned different results compared to the results from a store with the same data, but no shards.

What did you expect to happen?

I'm expecting the store using shards to represent the same data as the store not using shards, and any following computations would be identical.

Minimal Complete Verifiable Example

import xarray as xr
import dask.array as da
from dask.distributed import Client

client = Client()
rng = da.random.default_rng(seed=42)

# Generate some moderately size data to save to disk.
# Chunksize should be (255, 255, 255)
test_data = rng.integers(0, 2, size=(10000, 300, 300))

# Put in DataArray.
test_data = xr.DataArray(test_data)
test_data.name = "test_data"

#Setup encoding for shards.
encoding = {
    "test_data": {
        "shards": (255*2, 255, 255),
    }
}

# Save to disk.
test_data.to_zarr("test_random.zarr", zarr_format=3, mode="w")
test_data.to_zarr("test_random_shard.zarr", zarr_format=3, encoding=encoding, mode="w")

# Read from disk
test_data = xr.open_zarr("test_random.zarr").test_data
test_data_shard = xr.open_zarr("test_random_shard.zarr").test_data


#Expect this to return true, but it doesn't.
assert test_data.sum().compute().values == test_data_shard.sum().compute().values

Steps to reproduce

Note that setting shards to `(255*1, 255, 255) for the encoding above makes the assert return true.

MVCE confirmation

  • [x] Minimal example — the example is as focused as reasonably possible to demonstrate the underlying issue in xarray.
  • [x] Complete example — the example is self-contained, including all data and the text of any traceback.
  • [x] Verifiable example — the example copy & pastes into an IPython prompt or Binder notebook, returning the result.
  • [x] New issue — a search of GitHub Issues suggests this is not a duplicate.
  • [x] Recent environment — the issue occurs with the latest version of xarray and its dependencies.

Relevant log output


Anything else we need to know?

Maybe this a zarr issue. Happy to raise there instead if that is the case.

Environment

INSTALLED VERSIONS ------------------ commit: None python: 3.12.11 | packaged by conda-forge | (main, Jun 4 2025, 14:45:31) [GCC 13.3.0] python-bits: 64 OS: Linux OS-release: 6.17.0-2-default machine: x86_64 processor: x86_64 byteorder: little LC_ALL: None LANG: en_US.UTF-8 LOCALE: ('en_US', 'UTF-8') libhdf5: 1.14.6 libnetcdf: 4.9.3

xarray: 2025.10.1 pandas: 2.3.2 numpy: 2.2.6 scipy: 1.16.1 netCDF4: 1.7.2 pydap: None h5netcdf: None h5py: None zarr: 3.1.3 cftime: 1.6.4 nc_time_axis: 1.4.1 iris: None bottleneck: 1.5.0 dask: 2025.9.1 distributed: 2025.9.1 matplotlib: 3.10.6 cartopy: 0.24.0 seaborn: None numbagg: 0.9.2 fsspec: 2025.9.0 cupy: None pint: None sparse: 0.17.0 flox: 0.10.6 numpy_groupies: 0.11.3 setuptools: 80.9.0 pip: None conda: None pytest: None mypy: None IPython: 9.5.0 sphinx: None

Holmgren825 avatar Oct 08 '25 14:10 Holmgren825

Can you try also explicitly setting chunks in encoding? My guess is that some combination of Xarray/Zarr makes a bad guess of chunks, and picks something that overlaps your shards.

shoyer avatar Oct 08 '25 17:10 shoyer

I'm pretty sure this a zarr bug.

**zarr and dask reproducer**
# /// script
# requires-python = ">=3.13"
# dependencies = [
#     "zarr>=3.0.0",
#     "numpy",
#     "dask[array]",
# ]
# ///
"""
Minimal reproduction of xarray issue #10831 using Zarr v3 API with dask arrays.

This script tests whether sharding in Zarr v3 causes data corruption when
the shard size is larger than the chunk size in certain configurations.

The issue: When dask chunks are (255, 255, 255) and zarr shards are (510, 255, 255),
data gets corrupted. But when shards are (255, 255, 255), data is preserved.
"""

import numpy as np
import zarr
import dask.array as da


def test_zarr_sharding(dask_array, zarr_path, chunks=None, shards=None, label=""):
    """
    Write a dask array to Zarr storage and read it back to verify data integrity.

    Args:
        dask_array: The dask array to write
        zarr_path: Path for the Zarr store
        chunks: Chunk shape for Zarr array (None to let it be inferred)
        shards: Shard shape for Zarr array (None for no sharding)
        label: Description label for this test

    Returns:
        The sum of the read data
    """
    # Write
    store = zarr.storage.LocalStore(zarr_path)
    group = zarr.open_group(store=store, mode="w")

    # If chunks is None, don't specify it and let it be inferred
    create_kwargs = {
        "name": "test_data",
        "shape": dask_array.shape,
        "dtype": dask_array.dtype,
        "overwrite": True,
    }
    if chunks is not None:
        create_kwargs["chunks"] = chunks
    if shards is not None:
        create_kwargs["shards"] = shards

    array = group.create_array(**create_kwargs)
    da.to_zarr(dask_array, array)

    # Read back
    store_read = zarr.storage.LocalStore(zarr_path)
    group_read = zarr.open_group(store=store_read, mode="r")
    array_read = group_read["test_data"]
    return array_read[:].sum()


# Set seed for reproducibility
rng = da.random.default_rng(seed=42)

# Generate test data using dask arrays - smaller version for faster testing
# Using smaller array but maintaining the problematic chunk/shard relationship
# Shape: (1000, 300, 300) with dask chunks (255, 255, 255)
test_data_dask = rng.integers(
    0, 2, size=(1000, 300, 300), chunks=(255, 255, 255), dtype=np.int64
)
original_sum = test_data_dask.sum().compute()
print(f"Original data sum: {original_sum}")
print(f"Dask array chunks: {test_data_dask.chunksize}")

# Test 1: Save without sharding
print("\n=== Test 1: Without sharding ===")
sum1 = test_zarr_sharding(
    test_data_dask, "test_zarr_no_shard.zarr", chunks=(255, 255, 255)
)
print(f"Sum after reading (no sharding): {sum1}")
print(f"Match: {sum1 == original_sum}")

# Test 2: Save with sharding (510, 255, 255) but NO explicit chunks - let it be inferred
print("\n=== Test 2: With sharding (510, 255, 255), chunks inferred ===")
try:
    sum2 = test_zarr_sharding(
        test_data_dask,
        "test_zarr_shard_510_auto.zarr",
        chunks=None,
        shards=(510, 255, 255),
    )
    print(f"Sum after reading (sharding 510x255x255, auto chunks): {sum2}")
    print(f"Match: {sum2 == original_sum}")
except ValueError as e:
    sum2 = None
    print(f"ValueError (expected): {e}")
    print(
        "Note: Without explicit chunks, Zarr correctly rejects incompatible shard size"
    )

# Test 3: Save with sharding (510, 255, 255) AND explicit chunks (255, 255, 255)
# This tests the suggestion from @shoyer in the issue comments
print(
    "\n=== Test 3: With sharding (510, 255, 255) AND explicit chunks (255, 255, 255) ==="
)
sum3 = test_zarr_sharding(
    test_data_dask,
    "test_zarr_shard_510_chunks_255.zarr",
    chunks=(255, 255, 255),
    shards=(510, 255, 255),
)
print(f"Sum after reading (sharding 510x255x255, explicit chunks 255x255x255): {sum3}")
print(f"Match: {sum3 == original_sum}")

# Test 4: Save with sharding (255, 255, 255) - WORKS
print("\n=== Test 4: With sharding (255, 255, 255) ===")
sum4 = test_zarr_sharding(
    test_data_dask,
    "test_zarr_shard_255.zarr",
    chunks=(255, 255, 255),
    shards=(255, 255, 255),
)
print(f"Sum after reading (sharding 255x255x255): {sum4}")
print(f"Match: {sum4 == original_sum}")

# Summary
print("\n=== SUMMARY ===")
print(f"No sharding: {'✓ PASS' if sum1 == original_sum else '✗ FAIL'}")
print(
    f"Sharding (510, 255, 255), auto chunks: {'N/A (ValueError)' if sum2 is None else ('✓ PASS' if sum2 == original_sum else '✗ FAIL')}"
)
print(
    f"Sharding (510, 255, 255), explicit chunks (255, 255, 255): {'✓ PASS' if sum3 == original_sum else '✗ FAIL'}"
)
print(f"Sharding (255, 255, 255): {'✓ PASS' if sum4 == original_sum else '✗ FAIL'}")

running that gives:

Original data sum: 45004136
Dask array chunks: (255, 255, 255)

=== Test 1: Without sharding ===
Sum after reading (no sharding): 45004136
Match: True

=== Test 2: With sharding (510, 255, 255), chunks inferred ===
ValueError (expected): The array's `chunk_shape` needs to be divisible by the shard's inner `chunk_shape`.
Note: Without explicit chunks, Zarr correctly rejects incompatible shard size

=== Test 3: With sharding (510, 255, 255) AND explicit chunks (255, 255, 255) ===
Sum after reading (sharding 510x255x255, explicit chunks 255x255x255): 22841333
Match: False

=== Test 4: With sharding (255, 255, 255) ===
Sum after reading (sharding 255x255x255): 45004136
Match: True

=== SUMMARY ===
No sharding: ✓ PASS
Sharding (510, 255, 255), auto chunks: N/A (ValueError)
Sharding (510, 255, 255), explicit chunks (255, 255, 255): ✗ FAIL
Sharding (255, 255, 255): ✓ PASS

Thanks @shoyer for the hint towards passing chunks explictly. Interestingly what is happening here is that xarray is explcitily passing that to zarr when given a dask array. If you don't pass chunks to zarr then it will throw a valueerror about incompatible shapes.

So there are two things happening here:

  1. Zarr does not throw an error when explicitly passed incompatible shards and chunks
  2. for dask arrays xarray is inferring the chunks from dask and passing them explcitily

that happens here: https://github.com/pydata/xarray/blob/eed12c4989800199151f98ab565c9eb7bc272ac2/xarray/backends/zarr.py#L473-L482

So i'm pretty sure that this is a pure zarr bug, or dask/zarr. Although xarray could protect users by also thinking about the sharding and raising an error earlier.

ianhi avatar Oct 08 '25 18:10 ianhi

smaller reproducer. I can't decide if this a dask or zarr bug. But I will report upstream.

# /// script
# requires-python = ">=3.13"
# dependencies = [
#     "zarr @ git+https://github.com/zarr-developers/zarr-python.git",
#     "numpy",
#     "dask[array] @ git+https://github.com/dask/dask.git",
# ]
# ///
import dask.array as da
import numpy as np
import zarr

rng = da.random.default_rng(seed=42)
dask_array = rng.integers(
    0, 2, size=(1000, 300, 300), chunks=(255, 255, 255), dtype=np.int64
)
original_sum = dask_array.sum().compute()

store = zarr.storage.LocalStore("bug.zarr")
group = zarr.open_group(store=store, mode="w")
zarr_array = group.create_array(
    name="data",
    shape=dask_array.shape,
    chunks=(255, 255, 255),
    shards=(510, 255, 255),
    dtype=dask_array.dtype,
    overwrite=True,
)

da.to_zarr(dask_array, zarr_array)

store_read = zarr.storage.LocalStore("bug.zarr")
group_read = zarr.open_group(store=store_read, mode="r")
array_read = group_read["data"]
read_sum = array_read[:].sum()

assert read_sum == original_sum, (
    f"Data corruption: expected {original_sum}, got {read_sum}"
)

ianhi avatar Oct 08 '25 19:10 ianhi

The underlying issue here is likely that you have to write a full "shard" of data all at once with Zarr.

This means that if you're writing a Zarr file:

  1. Without shards, your Dask chunks need to equal (or be some even multiple of) Zarr chunks
  2. With shards, your Dask chunks need to equal to (or be some even multiple of) Zarr shards.

Ideally we would catch this with an error somewhere in Xarray/Dask/Zarr rather than writing junk.

shoyer avatar Oct 08 '25 19:10 shoyer

Ideally we would catch this with an error somewhere in Xarray/Dask/Zarr rather than writing junk.

We have safe_chunks logic that needs to be updated to use shards when available.

dcherian avatar Oct 13 '25 20:10 dcherian

This should be fixed once: https://github.com/dask/dask/pull/12105 is released

ianhi avatar Oct 23 '25 16:10 ianhi