cubed icon indicating copy to clipboard operation
cubed copied to clipboard

Creating cubed arrays from lazy xarray data

Open TomNicholas opened this issue 2 years ago • 8 comments

To subset my data whilst getting around #196 I tried slicing using xarray's lazy indexing machinery before converting to cubed arrays using .chunk (a trick which when used with dask avoids creating a bunch of open-dataset tasks for chunks that will never be used).

However this causes cubed to throw an error saying that projected blockwise mem exceeds max mem, which I don't understand - I literally haven't asked cubed to do any operations other than creating the array yet.

Screenshot from 2023-06-01 15-09-07

ValueError: Projected blockwise memory (86732800008) exceeds allowed_mem (10000000000), including reserved_mem (0)

The size of the projected memory usage also scales with the size of the subset I select.

TomNicholas avatar Jun 01 '23 19:06 TomNicholas

Actually, maybe that was my fault for putting chunks={}, which didn't read the on-disk chunking as I hoped, instead just returning the whole variable as one big chunk.

Interestingly if I try chunks={'time': 1, 'values': -1} I get a Zarr error from inside the rechunk function

...

File ~/Documents/Work/Code/cubed/cubed/primitive/rechunk.py:35, in rechunk(source, target_chunks, allowed_mem, reserved_mem, target_store, temp_store)
     31 # rechunker doesn't take account of uncompressed and compressed copies of the
     32 # input and output array chunk/selection, so adjust appropriately
     33 rechunker_max_mem = (allowed_mem - reserved_mem) / 4
---> 35 copy_specs, intermediate, target = _setup_rechunk(
     36     source=source,
     37     target_chunks=target_chunks,
     38     max_mem=rechunker_max_mem,
     39     target_store=target_store,
     40     temp_store=temp_store,
     41 )
     43 # source is a Zarr array, so only a single copy spec
     44 if len(copy_specs) != 1:  # pragma: no cover

File ~/Documents/Work/Code/cubed/cubed/vendor/rechunker/api.py:250, in _setup_rechunk(source, target_chunks, max_mem, target_store, target_options, temp_store, temp_options)
    247 # elif isinstance(source, (zarr.core.Array, dask.array.Array)):
    248 elif isinstance(source, zarr.core.Array):
--> 250     copy_spec = _setup_array_rechunk(
    251         source,
    252         target_chunks,
    253         max_mem,
    254         target_store,
    255         target_options=target_options,
    256         temp_store_or_group=temp_store,
    257         temp_options=temp_options,
    258     )
    259     intermediate = copy_spec.intermediate.array
    260     target = copy_spec.write.array

File ~/Documents/Work/Code/cubed/cubed/vendor/rechunker/api.py:296, in _setup_array_rechunk(source_array, target_chunks, max_mem, target_store_or_group, target_options, temp_store_or_group, temp_options, name)
    293     target_chunks = source_chunks
    295 if isinstance(target_chunks, dict):
--> 296     array_dims = _get_dims_from_zarr_array(source_array)
    297     try:
    298         target_chunks = _shape_dict_to_tuple(array_dims, target_chunks)

File ~/Documents/Work/Code/cubed/cubed/vendor/rechunker/api.py:21, in _get_dims_from_zarr_array(z_array)
     18 def _get_dims_from_zarr_array(z_array):
     19     # use Xarray convention
     20     # http://xarray.pydata.org/en/stable/internals.html#zarr-encoding-specification
---> 21     return z_array.attrs["_ARRAY_DIMENSIONS"]

File ~/miniconda3/envs/cubed_xarray/lib/python3.9/site-packages/zarr/attrs.py:74, in Attributes.__getitem__(self, item)
     73 def __getitem__(self, item):
---> 74     return self.asdict()[item]

KeyError: '_ARRAY_DIMENSIONS'

TomNicholas avatar Jun 01 '23 19:06 TomNicholas

Finally if I try specifying chunks explicitly e.g. chunks={'time': 1} then the indexing works.

However when I actually call compute then a possibly-related ModuleNotFoundError: No module named 'xarray' is raised by lithops, but the stack trace is not any more informative. I tried adding xarray to the lithops runtime requirements.txt but no change.

TomNicholas avatar Jun 01 '23 20:06 TomNicholas

Here's the full notebook, showing what I was trying to do (benchmark cubed on a similar "quadratic means" problem but with real data living in GCS).

TomNicholas avatar Jun 01 '23 20:06 TomNicholas

I tried adding xarray to the lithops runtime requirements.txt but no change.

I did this then ran

lithops runtime build -f requirements.txt cubed-runtime -b gcp_functions
lithops runtime deploy -b gcp_functions --memory 2048 cubed-runtime

The second line is normally optional since Lithops will auto deploy the first, but I think you may need to run it to force a new deployment. (In the past I have used lithops runtime delete -b gcp_functions cubed-runtime and even lithops clean -b gcp_functions to start again, but I didn't need that this time.)

With that I managed to get the notebook to run.

It has an interesting timeline though:

1685703178_timeline

I think this is due to the large number of rounds in the reduce. This could possibly be improved by trying a larger allowed_mem (since it can then do more aggregations in one go), but there are probably other optimizations that could be done too.

tomwhite avatar Jun 02 '23 11:06 tomwhite

I did this then ran

That's weird - I swear @cisaacstern and I tried exactly the same thing - we definitely did call deploy :sweat_smile: I'll try again but double-check all the commands, and explicitly clean first.

My requirements.txt looked like this:

cubed
lithops[gcf,gcp]
gcsfs
tqdm
xarray == 2023.5.0
cubed-xarray

It has an interesting timeline though:

I think this is due to the large number of rounds in the reduce. This could possibly be improved by trying a larger allowed_mem (since it can then do more aggregations in one go), but there are probably other optimizations that could be done too.

Huh. I'm very pleased that worked at all (real data in a huge store!) but wondering if it's not going to test / demonstrate the kind of scaling that I wanted to try out. I was looking for a problem that would show linear-like weak scaling on a larger and larger dataset, whereas it seems like this would deviate from linear scaling due to the number of aggregation steps.

TomNicholas avatar Jun 02 '23 14:06 TomNicholas

Thanks @tomwhite! @TomNicholas, yes feels somehow like a cached/old version of the runtime was being used, which perhaps can be solved by the delete/clean steps.

cisaacstern avatar Jun 02 '23 14:06 cisaacstern

I think this is due to the large number of rounds in the reduce. This could possibly be improved by trying a larger allowed_mem (since it can then do more aggregations in one go)

I tried running this again with allowed_mem="2GB", but the computation still took about the same time. The last few rounds were smaller, but slower (see below). I need to look into what is happening in more detail, but I think there is the opportunity for more optimisation here.

1685966338_timeline

tomwhite avatar Jun 05 '23 13:06 tomwhite

I changed the mean to a sum and the 21 min runtime went down to 7 min. I think this shows that the overhead of using structured arrays is significant, and it would be worth implementing #69.

tomwhite avatar Jun 05 '23 16:06 tomwhite

Closing as this is fixed now

tomwhite avatar May 20 '24 07:05 tomwhite