VirtualiZarr icon indicating copy to clipboard operation
VirtualiZarr copied to clipboard

"different codecs" problem with some files

Open mdsumner opened this issue 8 months ago • 3 comments

this is as much a question as anything, if I do this (no dask) everything is fine

from virtualizarr import open_virtual_mfdataset
import dask.multiprocessing
nw = int(2)
dask.config.set(num_workers = nw, scheduler = "processes")  

s3_creds = {"endpoint_url": "https://projects.pawsey.org.au",  "anon": True}

bb = "s3://idea-10.7289-v5sq8xb5/www.ncei.noaa.gov/data/sea-surface-temperature-optimum-interpolation/v2.1/access/avhrr/"
files = ['198405/oisst-avhrr-v02r01.19840531.nc', '198406/oisst-avhrr-v02r01.19840601.nc', '198406/oisst-avhrr-v02r01.19840602.nc', 
         '198406/oisst-avhrr-v02r01.19840603.nc', '198406/oisst-avhrr-v02r01.19840604.nc', '198406/oisst-avhrr-v02r01.19840605.nc', 
         '198406/oisst-avhrr-v02r01.19840606.nc']
s3files = [f'{bb}{file}' for file in files]
ds = open_virtual_mfdataset(s3files, reader_options = {"storage_options": s3_creds})

with dask I see

ds = open_virtual_mfdataset(s3files, parallel = "dask", reader_options = {"storage_options": s3_creds})
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/workenv/lib/python3.12/site-packages/virtualizarr/backend.py", line 377, in open_virtual_mfdataset
    combined_vds = combine_by_coords(
                   ^^^^^^^^^^^^^^^^^^
  File "/workenv/lib/python3.12/site-packages/xarray/structure/combine.py", line 983, in combine_by_coords
    concatenated_grouped_by_data_vars = tuple(
                                        ^^^^^^
  File "/workenv/lib/python3.12/site-packages/xarray/structure/combine.py", line 984, in <genexpr>
    _combine_single_variable_hypercube(
  File "/workenv/lib/python3.12/site-packages/xarray/structure/combine.py", line 656, in _combine_single_variable_hypercube
    concatenated = _combine_nd(
                   ^^^^^^^^^^^^
  File "/workenv/lib/python3.12/site-packages/xarray/structure/combine.py", line 246, in _combine_nd
    combined_ids = _combine_all_along_first_dim(
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/workenv/lib/python3.12/site-packages/xarray/structure/combine.py", line 278, in _combine_all_along_first_dim
    new_combined_ids[new_id] = _combine_1d(
                               ^^^^^^^^^^^^
  File "/workenv/lib/python3.12/site-packages/xarray/structure/combine.py", line 301, in _combine_1d
    combined = concat(
               ^^^^^^^
  File "/workenv/lib/python3.12/site-packages/xarray/structure/concat.py", line 277, in concat
    return _dataset_concat(
           ^^^^^^^^^^^^^^^^
  File "/workenv/lib/python3.12/site-packages/xarray/structure/concat.py", line 669, in _dataset_concat
    combined_var = concat_vars(
                   ^^^^^^^^^^^^
  File "/workenv/lib/python3.12/site-packages/xarray/core/variable.py", line 3000, in concat
    return Variable.concat(variables, dim, positions, shortcut, combine_attrs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/workenv/lib/python3.12/site-packages/xarray/core/variable.py", line 1729, in concat
    data = duck_array_ops.concatenate(arrays, axis=axis)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/workenv/lib/python3.12/site-packages/xarray/core/duck_array_ops.py", line 419, in concatenate
    return xp.concat(as_shared_dtype(arrays, xp=xp), axis=axis)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/workenv/lib/python3.12/site-packages/virtualizarr/manifests/array.py", line 136, in __array_function__
    return MANIFESTARRAY_HANDLED_ARRAY_FUNCTIONS[func](*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/workenv/lib/python3.12/site-packages/virtualizarr/manifests/array_api.py", line 68, in concatenate
    check_combinable_zarr_arrays(arrays)
  File "/workenv/lib/python3.12/site-packages/virtualizarr/manifests/utils.py", line 186, in check_combinable_zarr_arrays
    check_same_codecs([get_codecs(arr) for arr in arrays])
  File "/workenv/lib/python3.12/site-packages/virtualizarr/manifests/utils.py", line 99, in check_same_codecs
    raise NotImplementedError(
NotImplementedError: The ManifestArray class cannot concatenate arrays which were stored using different codecs, But found codecs (FixedScaleOffset(codec_name='numcodecs.fixedscaleoffset', codec_config={'scale': 100.0, 'offset': 0.0, 'dtype': '<f8', 'astype': '<i2'}), BytesCodec(endian=<Endian.little: 'little'>), Shuffle(codec_name='numcodecs.shuffle', codec_config={'elementsize': 2}), Zlib(codec_name='numcodecs.zlib', codec_config={'level': 4})) vs (FixedScaleOffset(codec_name='numcodecs.fixedscaleoffset', codec_config={'scale': 100.0, 'offset': 0.0, 'dtype': '<f8', 'astype': '<i2'}), BytesCodec(endian=<Endian.little: 'little'>), Shuffle(codec_name='numcodecs.shuffle', codec_config={'elementsize': 2}), Zlib(codec_name='numcodecs.zlib', codec_config={'level': 4})) .See https://github.com/zarr-developers/zarr-specs/issues/288

different combinations of these files do/don't cause the error (??) these are both ok

ds = open_virtual_mfdataset(s3files[0:6], parallel = "dask", reader_options = {"storage_options": s3_creds})
ds = open_virtual_mfdataset(s3files[1:7], parallel = "dask", reader_options = {"storage_options": s3_creds})

afaics the reported codec descriptions are the same, teasing out the message:

NotImplementedError: The ManifestArray class cannot concatenate arrays which were stored using different codecs, But found codecs 

(FixedScaleOffset(codec_name='numcodecs.fixedscaleoffset', codec_config={'scale': 100.0, 'offset': 0.0, 'dtype': '<f8', 'astype': '<i2'}), BytesCodec(endian=<Endian.little: 'little'>), Shuffle(codec_name='numcodecs.shuffle', codec_config={'elementsize': 2}), Zlib(codec_name='numcodecs.zlib', codec_config={'level': 4}))

vs 

(FixedScaleOffset(codec_name='numcodecs.fixedscaleoffset', codec_config={'scale': 100.0, 'offset': 0.0, 'dtype': '<f8', 'astype': '<i2'}), BytesCodec(endian=<Endian.little: 'little'>), Shuffle(codec_name='numcodecs.shuffle', codec_config={'elementsize': 2}), Zlib(codec_name='numcodecs.zlib', codec_config={'level': 4})) 

.See https://github.com/zarr-developers/zarr-specs/issues/288
a = "(FixedScaleOffset(codec_name='numcodecs.fixedscaleoffset', codec_config={'scale': 100.0, 'offset': 0.0, 'dtype': '<f8', 'astype': '<i2'}), BytesCodec(endian=<Endian.little: 'little'>), Shuffle(codec_name='numcodecs.shuffle', codec_config={'elementsize': 2}), Zlib(codec_name='numcodecs.zlib', codec_config={'level': 4}))"

b = "(FixedScaleOffset(codec_name='numcodecs.fixedscaleoffset', codec_config={'scale': 100.0, 'offset': 0.0, 'dtype': '<f8', 'astype': '<i2'}), BytesCodec(endian=<Endian.little: 'little'>), Shuffle(codec_name='numcodecs.shuffle', codec_config={'elementsize': 2}), Zlib(codec_name='numcodecs.zlib', codec_config={'level': 4}))"
a == b
# True

import virtualizarr
virtualizarr.__version__
'1.3.3.dev32+ge707375'

mdsumner avatar Apr 05 '25 23:04 mdsumner

@mdsumner you're not even supposed to be using open_virtual_mfdataset yet - I haven't documented it or released a version which includes it! 🤣

That said this should work. This is a really extremely weird error.

Parallelizing with dask should not make any difference to whether the datasets are combinable or not - they are totally separate parts of the codebase. It also computes the dask.delayed objects before it even gets to the combining part!!

Somehow dask must be causing these codecs to be different in some extremely subtle way. Does it work if you use the concurrent.futures.ThreadPoolExecutor instead?

different combinations of these files do/don't cause the error (??) these are both ok

Are you able to combine them all if you just use parallel=False? EDIT: Oh yeah that is fine, it's what you did at the top.

a = "(FixedScaleOffset(codec_name='numcodecs.fixedscaleoffset', codec_config={'scale': 100.0, 'offset': 0.0, 'dtype': '<f8', 'astype': '<i2'}), BytesCodec(endian=<Endian.little: 'little'>), Shuffle(codec_name='numcodecs.shuffle', codec_config={'elementsize': 2}), Zlib(codec_name='numcodecs.zlib', codec_config={'level': 4}))"

b = "(FixedScaleOffset(codec_name='numcodecs.fixedscaleoffset', codec_config={'scale': 100.0, 'offset': 0.0, 'dtype': '<f8', 'astype': '<i2'}), BytesCodec(endian=<Endian.little: 'little'>), Shuffle(codec_name='numcodecs.shuffle', codec_config={'elementsize': 2}), Zlib(codec_name='numcodecs.zlib', codec_config={'level': 4}))"
a == b
# True 

This reminds me of https://github.com/zarr-developers/VirtualiZarr/issues/501, where two identical-looking objects compared not equal due to python object comparison subtleties. It would be useful to try comparing the actual objects instead of comparing their string representations, i.e.

a = (FixedScaleOffset(codec_name='numcodecs.fixedscaleoffset', codec_config={'scale': 100.0, 'offset': 0.0, 'dtype': '<f8', 'astype': '<i2'}), BytesCodec(endian=<Endian.little: 'little'>), Shuffle(codec_name='numcodecs.shuffle', codec_config={'elementsize': 2}), Zlib(codec_name='numcodecs.zlib', codec_config={'level': 4}))

b = (FixedScaleOffset(codec_name='numcodecs.fixedscaleoffset', codec_config={'scale': 100.0, 'offset': 0.0, 'dtype': '<f8', 'astype': '<i2'}), BytesCodec(endian=<Endian.little: 'little'>), Shuffle(codec_name='numcodecs.shuffle', codec_config={'elementsize': 2}), Zlib(codec_name='numcodecs.zlib', codec_config={'level': 4}))
a == b
# ?

Alternatively you could interupt the code just before you get the error, extract the codecs and compare them.

TomNicholas avatar Apr 05 '25 23:04 TomNicholas

ah good point should have tried that (fwiw I am not seeing a speed up on "normal" VM or slurm with ThreadPoolExecutor so I was using dask as a way to ensure I do get parallel speed up, and this generally as a way to understand concurrent.futures ... and then I rabbit holed this)

from concurrent.futures import ThreadPoolExecutor
ds = open_virtual_mfdataset(s3files, parallel = ThreadPoolExecutor,  reader_options = {"storage_options": s3_creds})

that works fine. I'll see if I can get the performance I see with dask (using delayed open_virtual_dataset and concat)

mdsumner avatar Apr 05 '25 23:04 mdsumner

I think, because the coord vars have these filename objects in the encoding

d0 = open_virtual_dataset(s3files[0])
d0.lon.encoding
{'chunksizes': (1440,), 'fletcher32': False, 'shuffle': True, 'preferred_chunks': {'lon': 1440}, 'zlib': True, 'complevel': 4, 'source': '<File-like object S3FileSystem, 

idea-10.7289-v5sq8xb5/www.ncei.noaa.gov/data/sea-surface-temperature-optimum-interpolation/v2.1/access/avhrr/198405/oisst-avhrr-v02r01.19840531.nc>', 'original_shape': (1440,), 'dtype': dtype('<f4')}

d1 = open_virtual_dataset(s3files[6])
d1.lon.encoding
{'chunksizes': (1440,), 'fletcher32': False, 'shuffle': True, 'preferred_chunks': {'lon': 1440}, 'zlib': True, 'complevel': 4, 'source': '<File-like object S3FileSystem, 

idea-10.7289-v5sq8xb5/www.ncei.noaa.gov/data/sea-surface-temperature-optimum-interpolation/v2.1/access/avhrr/198406/oisst-avhrr-v02r01.19840606.nc>', 'original_shape': (1440,), 'dtype': dtype('<f4')}

that there's combining happening in batches (right?) so non-concatenable arrays are made distinct, keeping just the first one in a batch, but when the batches get combined we're not getting the same behaviour to fall back to the first (or just one). I'll explore it a bit more deeply.

mdsumner avatar Apr 21 '25 13:04 mdsumner

Was this resolved @mdsumner ?

TomNicholas avatar Jul 14 '25 16:07 TomNicholas

I don't know, had a quick try but can't navigate all the new idioms for setting endpoint.

This is not a priority for me, fwiw - the old way without "mf" was working pretty well and there are bigger issues with the files per se that I experienced, getting things consistent enough from netcdf is a huge task that I'm unlikely to pursue much.

mdsumner avatar Jul 15 '25 00:07 mdsumner

I ran into this problem just yesterday, I nailed down to Dask serialization + the check_same_codecs() in virtualizarr being strict on equality. When Dask serialized the codecs it seems like some instances no longer share the same type(obj). Mixed imports, or context change says the LLM. I tried to replicate the bug with a unit test.

Another potential issue would be if we use the same codec but the compression level is different, check_same_codecs() will fail and it shouldn't. Will try to open a PR today.

betolink avatar Sep 03 '25 14:09 betolink

virtualizarr being strict on equality. ... When Dask serialized the codecs

Yeah comparing these objects has caused problems before. I am not surprised there are further edge cases, or that they crop up during serialization.

if we use the same codec but the compression level is different, check_same_codecs() will fail and it shouldn't.

I feel like a Codec with different arguments still counts as a different codec surely?

TomNicholas avatar Sep 03 '25 15:09 TomNicholas

I feel like a Codec with different arguments still counts as a different codec surely?

In an ideal world yes, but with decades-long collections there are some subtleties like ZlibCodec('numcodecs.zlib', {'level': 6}) and ZlibCodec('numcodecs.zlib', {'level': 5}) and in practice they are the same codec. Perhaps the check_same_codecs() should accommodate just the compression level as a special case? I think the more urgent issue will be refactoring check_same_codecs() so that this will pass if we are using Dask


file_1_chain = (
    BytesCodec(endian=None),
    Shuffle('numcodecs.shuffle', {'elementsize': 1}),
    Zlib('numcodecs.zlib', {'level': 6})
)

file_2_chain = (
    BytesCodec(endian=None),
    Shuffle('numcodecs.shuffle', {'elementsize': 1}),
    Zlib('numcodecs.zlib', {'level': 6})  # Different type id after serialization round trip!
)

betolink avatar Sep 03 '25 15:09 betolink

in practice they are the same codec.

I don't think it's reasonable to expect VirtualiZarr to be able to deduce this. I still don't understand what you mean by saying they are the same codec. If compression level needs to be special-cased for the zlib codec then that should be handled by the implementation of ZlibCodec.__eq__().

If you want to discuss this further we should make a separate issue, as it's different to your immediate problem.

the more urgent issue will be refactoring check_same_codecs() so that this will pass if we are using Dask

I agree - do you know what to do to fix it? Are you interested in submitting a PR?

TomNicholas avatar Sep 03 '25 18:09 TomNicholas

@betolink I concur with @TomNicholas here. To my knowledge, there is no way to determine apriori that the same compression scheme using different configuration will produce equivalent compressed output (using Zlib level 5 and 6 may be practically equivalent but level 1 and 6 certainly are not) and this is all dependent on the origin byte stream.

How we represent "mixed" underlying source arrays (which arrays with different codecs are a type of) as a single uniform virtual Zarr array is still a problem under heavy investigation. See https://github.com/zarr-developers/VirtualiZarr/issues/5 and https://github.com/zarr-developers/zarr-python/issues/2536 (@TomNicholas also has a great issue detailing this on the Zarr repo that I can't seem to locate, perhaps he can share as well).

sharkinsspatial avatar Sep 03 '25 18:09 sharkinsspatial

If compression level needs to be special-cased for the zlib codec then that should be handled by the implementation of ZlibCodec.eq()

that would be a good place to improve this. But this is not urgent.

I agree - do you know what to do to fix it? Are you interested in submitting a PR?

I tested 2 approaches that seem to work:

- if codec != first_codec:
+ if tuple(map(repr, codec)) != tuple(map(repr, first_codec)):

This is the simplest, the codec repr contains the attribute values so we'll be comparing e.g.

  • ("Blosc(cname='lz4', clevel=5, shuffle=SHUFFLE, blocksize=0)",)
  • ("Blosc(cname='lz4', clevel=6, shuffle=SHUFFLE, blocksize=0)",)

Which should fail the equality. I also tried this other library DeepDiff

- if codec != first_codec:
+ if DeepDiff(codec, first_codec):

Same results but introduces deepdiff as dependency.

I added some unit tests for this, I can open a PR tonight/tomorrow.

betolink avatar Sep 03 '25 18:09 betolink

that would be a good place to improve this. But this is not urgent.

I think that's the only place we want to improve this. The current behaviour (raising over-eagerly) is fine for now, and far better than the opposite.

I tested 2 approaches that seem to work:

Actually is it possible to solve this upstream too? Summoning @d-v-b to comment on the intended comparison properties of zarr codecs 📿

TomNicholas avatar Sep 03 '25 19:09 TomNicholas

@sharkinsspatial thanks for the pointers! the dream will be to have these variable encoding datasets be zarr compatible out of the box. Wasn't there a proposal for this?

@TomNicholas would you be OK with the first approach on a PR? and perhaps reverting it to the native upstream (fixed?) equality if it gets implemented? if this gets accepted/merged, do you have an idea when the next VirtualiZarr release will be ready? It will be great to have it for the next earthaccess release and showcase some cool access patterns.

betolink avatar Sep 03 '25 19:09 betolink

the dream will be to have these variable encoding datasets be zarr compatible out of the box. Wasn't there a proposal for this?

https://github.com/zarr-developers/zarr-specs/issues/288

if this gets accepted/merged, do you have an idea when the next VirtualiZarr release will be ready?

We just released just now. We can release again whenever you like 🙂

@TomNicholas would you be OK with the first approach on a PR

The problem with that proposed fix is that it would effectively change our definition of codec equality from "the codec classes say they are equal" to "the codec classes have the same repr". The latter seems more brittle than the former.

Zlib('numcodecs.zlib', {'level': 6}) # Different type id after serialization round trip!

Can you help me by providing a minimal reproducer? I feel like the core issue can be solved upstream by adding __eq__ to https://github.com/zarr-developers/numcodecs/blob/878d2b385e4f1c6b113aca765e8632354abb3c9c/numcodecs/zarr3.py#L97...

TomNicholas avatar Sep 03 '25 20:09 TomNicholas

Actually is it possible to solve this upstream too? Summoning @d-v-b to comment on the intended comparison properties of zarr codecs 📿

I'm confused about the actual bug here.

From a Zarr V3 metadata POV, codecs are JSON objects with a "name" field and a "configuration" field, and so codecs follow the same rules for equality as any other JSON object.

From a Zarr implementation POV, you could have two different classes that both support the same logical codec, e.g. the built-in gzip codec and your own personal gzip codec. Instances of these two classes might serialize to/from the same JSON, but they would not be equal (because they are different classes). In this case you would probably want to compare the JSON objects they generate to check if they are the "same" codec (from a metadata POV).

d-v-b avatar Sep 03 '25 20:09 d-v-b

This is the simplest, the codec repr contains the attribute values so we'll be comparing e.g.

definitely don't rely on the repr for comparisons between codecs! that's not what the repr is for.

d-v-b avatar Sep 03 '25 20:09 d-v-b

In this case you would probably want to compare the JSON objects they generate to check if they are the "same" codec (from a metadata POV).

What I was going to suggest adding upstream was an implementation of __eq__ that converted the codec objects to JSON and compared the JSON. Sounds like we should still do that, but not under the __eq__ method?

TomNicholas avatar Sep 03 '25 20:09 TomNicholas

is there a reproducer? Because I can't really tell what needs to be changed without seeing one

d-v-b avatar Sep 03 '25 20:09 d-v-b

I added a debugger method for when the inequality fails in https://github.com/zarr-developers/VirtualiZarr/blob/c1db9259ff2d660155d15c675045b692f9181161/virtualizarr/manifests/utils.py#L119

It's basically the same issue described by @mdsumner: 2 files that have the same codecs can't be concatenated by virtualizarr when we use Dask. I guess Dask is doing something when it serializes/deserializes the codec in distributed mode that changes the instance type.

I'll work on a minimal reproducible example in the meantime this is the output I get (truncated):

PROPERTY             | CODEC 0[2]                               | CODEC 1[2]                               | MATCH
-------------------- | ---------------------------------------- | ---------------------------------------- | -----
repr                 | Zlib(codec_name='numcodecs.zlib', code   | Zlib(codec_name='numcodecs.zlib', code   | ✓
str                  | Zlib(codec_name='numcodecs.zlib', code   | Zlib(codec_name='numcodecs.zlib', code   | ✓
type                 | <class 'numcodecs.zarr3.Zlib'>           | <class 'numcodecs.zarr3.Zlib'>           | ✗
type_name            | numcodecs.zarr3.Zlib                     | numcodecs.zarr3.Zlib                     | ✓
type_id              | 131899392405856                          | 131899392411600                          | ✗
object_id            | 131899190701072                          | 131899190702352                          | ✗
module               | numcodecs.zarr3                          | numcodecs.zarr3                          | ✓
class_name           | Zlib                                     | Zlib                                     | ✓
module_file          | /home/betolink/.micromamba/envs/eartha   | /home/betolink/.micromamba/envs/eartha   | ✓
attributes           | {'codec_config': {'id': 'zlib', 'level   | {'codec_config': {'id': 'zlib', 'level   | ✓
codec_name           | numcodecs.zlib                           | numcodecs.zlib                           | ✓
codec_config         | {'id': 'zlib', 'level': 6}               | {'id': 'zlib', 'level': 6}               | ✓

EQUALITY TESTS:
  codec_config: {'id': 'zlib', 'level': 6} vs {'id': 'zlib', 'level': 6} ✓

  obj1 == obj2: False
  obj1 is obj2: False
  type(obj1) == type(obj2): False
  type(obj1) is type(obj2): False

Seems like this class is the one failing the inequality for Dask: https://github.com/zarr-developers/zarr-python/blob/e738e2fb88dbead26a853d9982ff46eab64f313f/src/zarr/abc/metadata.py#L17

betolink avatar Sep 03 '25 21:09 betolink

Dask uses (cloud)pickle - maybe your minimal reproducer needs to pickle then unpickle the class before comparing?

TomNicholas avatar Sep 03 '25 21:09 TomNicholas

super strange, because the class in question is a dataclass and we don't override __eq__. According to the stdlib,

eq: If true (the default), an eq() method will be generated. This method compares the class as if it were a tuple of its fields, in order. Both instances in the comparison must be of the identical type.

a == b should only be considering the fields, which should be simple (a string and a dict). So i'm confused at how this could be failing.

d-v-b avatar Sep 03 '25 21:09 d-v-b

I was going crazy with this and then I noticed I was not using the last version of numcodecs, but version 15.1: https://github.com/zarr-developers/numcodecs/blob/3cf8ab1f28e6fbda24fa16c33654cf560a8d4ae2/numcodecs/zarr3.py#L257 there was an explicit instantiation that may have been affecting how Dask de-serialization works.

Updating to numcodecs 16.2 solved the issue I suppose related to this commit https://github.com/zarr-developers/numcodecs/commit/936f4d2212813ddddd91bf27dca257b49d9af8e4 cc @TomNicholas @d-v-b

betolink avatar Sep 04 '25 15:09 betolink

glad you could figure this out! Sadly the numcodecs.zarr3 module has weighed on this community like a curse.

d-v-b avatar Sep 04 '25 15:09 d-v-b

Nice! So you basically hit the same problem with dask that I had hit with lithops (as both send functions via pickle). That was the context of that PR.

I'm going to close this issue now.

TomNicholas avatar Sep 04 '25 16:09 TomNicholas