dask.distributed does not work with grib files opened by earthkit
What happened?
I think it's the same problem reported in #375, but I'm opening a new issue as I'm not 100% sure.
I'm not able to use dask.distributed with grib files opened by earthkit-data with dask.
I get this warning when I open the data with dask:
In , overriding the default value (chunks=None) with chunks={} is not recommended.
Is that intentional? We are not supposed to use dask with grib files?
What are the steps to reproduce the bug?
import earthkit.data
import dask.distributed
client = dask.distributed.Client()
collection_id = "reanalysis-era5-single-levels"
request = {
"variable": "2t",
"product_type": "reanalysis",
"date": "2012-12-01",
"time": "12:00",
}
earthkit_ds = earthkit.data.from_source("cds", collection_id, **request)
xr_ds = earthkit_ds.to_xarray(xarray_open_dataset_kwargs={"chunks": {}})
xr_ds.to_netcdf("test.nc") # TypeError
Version
0.7.0
Platform (OS and architecture)
Linux eqc-quality-tools.eqc.compute.cci1.ecmwf.int 5.14.0-362.8.1.el9_3.x86_64 #1 SMP PREEMPT_DYNAMIC Wed Nov 8 17:36:32 UTC 2023 x86_64 x86_64 x86_64 GNU/Linux
Relevant log output
---------------------------------------------------------------------------
KeyError Traceback (most recent call last)
File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/distributed/protocol/pickle.py:63, in dumps(x, buffer_callback, protocol)
62 try:
---> 63 result = pickle.dumps(x, **dump_kwargs)
64 except Exception:
File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/earthkit/data/readers/grib/xarray.py:38, in IndexWrapperForCfGrib.__getstate__(self)
37 def __getstate__(self):
---> 38 return dict(index=serialise_state(self.index), ignore_keys=self.ignore_keys)
File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/earthkit/data/utils/serialise.py:26, in serialise_state(obj)
25 LOG.info("serialise %s", fullname)
---> 26 return (fullname, SERIALISATION[fullname][0](obj))
KeyError: ('earthkit.data.readers.grib.reader', 'GRIBReader')
During handling of the above exception, another exception occurred:
KeyError Traceback (most recent call last)
File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/distributed/protocol/pickle.py:68, in dumps(x, buffer_callback, protocol)
67 buffers.clear()
---> 68 pickler.dump(x)
69 result = f.getvalue()
File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/earthkit/data/readers/grib/xarray.py:38, in IndexWrapperForCfGrib.__getstate__(self)
37 def __getstate__(self):
---> 38 return dict(index=serialise_state(self.index), ignore_keys=self.ignore_keys)
File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/earthkit/data/utils/serialise.py:26, in serialise_state(obj)
25 LOG.info("serialise %s", fullname)
---> 26 return (fullname, SERIALISATION[fullname][0](obj))
KeyError: ('earthkit.data.readers.grib.reader', 'GRIBReader')
During handling of the above exception, another exception occurred:
KeyError Traceback (most recent call last)
File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/distributed/protocol/serialize.py:363, in serialize(x, serializers, on_error, context, iterate_collection)
362 try:
--> 363 header, frames = dumps(x, context=context) if wants_context else dumps(x)
364 header["serializer"] = name
File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/distributed/protocol/serialize.py:78, in pickle_dumps(x, context)
76 writeable.append(not f.readonly)
---> 78 frames[0] = pickle.dumps(
79 x,
80 buffer_callback=buffer_callback,
81 protocol=context.get("pickle-protocol", None) if context else None,
82 )
83 header = {
84 "serializer": "pickle",
85 "writeable": tuple(writeable),
86 }
File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/distributed/protocol/pickle.py:81, in dumps(x, buffer_callback, protocol)
80 buffers.clear()
---> 81 result = cloudpickle.dumps(x, **dump_kwargs)
82 except Exception:
File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/cloudpickle/cloudpickle.py:1479, in dumps(obj, protocol, buffer_callback)
1478 cp = Pickler(file, protocol=protocol, buffer_callback=buffer_callback)
-> 1479 cp.dump(obj)
1480 return file.getvalue()
File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/cloudpickle/cloudpickle.py:1245, in Pickler.dump(self, obj)
1244 try:
-> 1245 return super().dump(obj)
1246 except RuntimeError as e:
File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/earthkit/data/readers/grib/xarray.py:38, in IndexWrapperForCfGrib.__getstate__(self)
37 def __getstate__(self):
---> 38 return dict(index=serialise_state(self.index), ignore_keys=self.ignore_keys)
File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/earthkit/data/utils/serialise.py:26, in serialise_state(obj)
25 LOG.info("serialise %s", fullname)
---> 26 return (fullname, SERIALISATION[fullname][0](obj))
KeyError: ('earthkit.data.readers.grib.reader', 'GRIBReader')
The above exception was the direct cause of the following exception:
TypeError Traceback (most recent call last)
Cell In[1], line 15
13 earthkit_ds = earthkit.data.from_source("cds", collection_id, **request)
14 xr_ds = earthkit_ds.to_xarray(xarray_open_dataset_kwargs={"chunks": {}})
---> 15 xr_ds.to_netcdf("test.nc")
File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/xarray/core/dataset.py:2298, in Dataset.to_netcdf(self, path, mode, format, group, engine, encoding, unlimited_dims, compute, invalid_netcdf)
2295 encoding = {}
2296 from xarray.backends.api import to_netcdf
-> 2298 return to_netcdf( # type: ignore # mypy cannot resolve the overloads:(
2299 self,
2300 path,
2301 mode=mode,
2302 format=format,
2303 group=group,
2304 engine=engine,
2305 encoding=encoding,
2306 unlimited_dims=unlimited_dims,
2307 compute=compute,
2308 multifile=False,
2309 invalid_netcdf=invalid_netcdf,
2310 )
File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/xarray/backends/api.py:1348, in to_netcdf(dataset, path_or_file, mode, format, group, engine, encoding, unlimited_dims, compute, multifile, invalid_netcdf)
1345 if multifile:
1346 return writer, store
-> 1348 writes = writer.sync(compute=compute)
1350 if isinstance(target, BytesIO):
1351 store.sync()
File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/xarray/backends/common.py:297, in ArrayWriter.sync(self, compute, chunkmanager_store_kwargs)
294 if chunkmanager_store_kwargs is None:
295 chunkmanager_store_kwargs = {}
--> 297 delayed_store = chunkmanager.store(
298 self.sources,
299 self.targets,
300 lock=self.lock,
301 compute=compute,
302 flush=True,
303 regions=self.regions,
304 **chunkmanager_store_kwargs,
305 )
306 self.sources = []
307 self.targets = []
File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/xarray/namedarray/daskmanager.py:249, in DaskManager.store(self, sources, targets, **kwargs)
241 def store(
242 self,
243 sources: Any | Sequence[Any],
244 targets: Any,
245 **kwargs: Any,
246 ) -> Any:
247 from dask.array import store
--> 249 return store(
250 sources=sources,
251 targets=targets,
252 **kwargs,
253 )
File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/dask/array/core.py:1236, in store(***failed resolving arguments***)
1234 elif compute:
1235 store_dsk = HighLevelGraph(layers, dependencies)
-> 1236 compute_as_if_collection(Array, store_dsk, map_keys, **kwargs)
1237 return None
1239 else:
File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/dask/base.py:402, in compute_as_if_collection(cls, dsk, keys, scheduler, get, **kwargs)
400 schedule = get_scheduler(scheduler=scheduler, cls=cls, get=get)
401 dsk2 = optimization_function(cls)(dsk, keys, **kwargs)
--> 402 return schedule(dsk2, keys, **kwargs)
File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/distributed/client.py:3259, in Client.get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
3186 def get(
3187 self,
3188 dsk,
(...)
3200 **kwargs,
3201 ):
3202 """Compute dask graph
3203
3204 Parameters
(...)
3257 Client.compute : Compute asynchronous collections
3258 """
-> 3259 futures = self._graph_to_futures(
3260 dsk,
3261 keys=set(flatten([keys])),
3262 workers=workers,
3263 allow_other_workers=allow_other_workers,
3264 resources=resources,
3265 fifo_timeout=fifo_timeout,
3266 retries=retries,
3267 user_priority=priority,
3268 actors=actors,
3269 )
3270 packed = pack_data(keys, futures)
3271 if sync:
File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/distributed/client.py:3155, in Client._graph_to_futures(self, dsk, keys, workers, allow_other_workers, internal_priority, user_priority, resources, retries, fifo_timeout, actors)
3152 from distributed.protocol import serialize
3153 from distributed.protocol.serialize import ToPickle
-> 3155 header, frames = serialize(ToPickle(dsk), on_error="raise")
3157 pickled_size = sum(map(nbytes, [header] + frames))
3158 if pickled_size > parse_bytes(
3159 dask.config.get("distributed.admin.large-graph-warning-threshold")
3160 ):
File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/distributed/protocol/serialize.py:389, in serialize(x, serializers, on_error, context, iterate_collection)
387 except Exception:
388 raise TypeError(msg) from exc
--> 389 raise TypeError(msg, str_x) from exc
390 else: # pragma: nocover
391 raise ValueError(f"{on_error=}; expected 'message' or 'raise'")
TypeError: ('Could not serialize object of type HighLevelGraph', '<ToPickle: HighLevelGraph with 1 layers.\n<dask.highlevelgraph.HighLevelGraph object at 0x7feabb21bf90>\n 0. 140646138612736\n>')
Accompanying data
No response
Organisation
B-Open / CADS-EQC
Thank you for reporting this issue. I looked into it and it does not seem to be related to #375.
The reason for this failure is that serialisation for this GRIB data is yet to be implemented.