iris
iris copied to clipboard
Error 'Could not serialize object of type _FillValueMaskCheckAndStore' saving computed cube with Dask distributed
🐛 Bug Report
I don't seem to be able to save a cube that has been computed on a dask cluster.
To be honest, I don't know if I should be able to, but if I could it would be really useful.
How To Reproduce
from dask.distributed import Client
client = Client(n_workers=4)
import iris
cube = iris.load_cube('~/environment/data/000490262cdd067721a34112963bcaa2b44860ab.nc')
assert cube.shape == (18, 33, 960, 1280)
averages = cube.collapsed('realization', iris.analysis.MEAN)
assert type(averages) == iris.cube.Cube
iris.io.save(averages, "delme.nc")
Expected behaviour
File is saved without error. This is the behaviour if I don't start a dask.distributed.Client before invoking Iris.
Environment
- OS & Version: Amazon Linux 2.3 (Centos)
- Iris Version: 3.1.0
Some more relevant versions
# packages in environment at /home/ec2-user/miniconda3/envs/iris:
#
# Name Version Build Channel
cloudpickle 2.0.0 pyhd8ed1ab_0 conda-forge
dask 2021.12.0 pyhd8ed1ab_0 conda-forge
dask-core 2021.12.0 pyhd8ed1ab_0 conda-forge
hdf4 4.2.15 h10796ff_3 conda-forge
hdf5 1.12.1 nompi_h2750804_103 conda-forge
ipykernel 6.6.1 py310hfdc917e_0 conda-forge
ipython 7.31.0 py310hff52083_0 conda-forge
iris 3.1.0 pyhd8ed1ab_3 conda-forge
numpy 1.22.0 py310h454958d_0 conda-forge
pandas 1.3.5 py310hb5077e9_0 conda-forge
pickleshare 0.7.5 py_1003 conda-forge
python 3.10.1 h62f1059_2_cpython conda-forge
scipy 1.7.3 py310hea5193d_0 conda-forge
Additional context
Stack trace
distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/core.py", line 76, in dumps
frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/msgpack/__init__.py", line 35, in packb
return Packer(**kwargs).pack(o)
File "msgpack/_packer.pyx", line 294, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 300, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 297, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 285, in msgpack._cmsgpack.Packer._pack
File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/core.py", line 57, in _encode_default
sub_header, sub_frames = serialize_and_split(
File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 435, in serialize_and_split
header, frames = serialize(x, serializers, on_error, context)
File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 262, in serialize
return serialize(
File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 308, in serialize
headers_frames = [
File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 309, in <listcomp>
serialize(
File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 359, in serialize
raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type _FillValueMaskCheckAndStoreTarget.', '<iris.fileformats.netcdf._FillValueMaskCheckAndStoreTarget object at 0x7f8254a6a620>')
distributed.comm.utils - ERROR - ('Could not serialize object of type _FillValueMaskCheckAndStoreTarget.', '<iris.fileformats.netcdf._FillValueMaskCheckAndStoreTarget object at 0x7f8254a6a620>')
Traceback (most recent call last):
File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/comm/utils.py", line 33, in _to_frames
return list(protocol.dumps(msg, **kwargs))
File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/core.py", line 76, in dumps
frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/msgpack/__init__.py", line 35, in packb
return Packer(**kwargs).pack(o)
File "msgpack/_packer.pyx", line 294, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 300, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 297, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 285, in msgpack._cmsgpack.Packer._pack
File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/core.py", line 57, in _encode_default
sub_header, sub_frames = serialize_and_split(
File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 435, in serialize_and_split
header, frames = serialize(x, serializers, on_error, context)
File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 262, in serialize
return serialize(
File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 308, in serialize
headers_frames = [
File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 309, in <listcomp>
serialize(
File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 359, in serialize
raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type _FillValueMaskCheckAndStoreTarget.', '<iris.fileformats.netcdf._FillValueMaskCheckAndStoreTarget object at 0x7f8254a6a620>')
distributed.batched - ERROR - Error in batched write
Traceback (most recent call last):
File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/batched.py", line 93, in _background_send
nbytes = yield self.comm.write(
File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/tornado/gen.py", line 762, in run
value = future.result()
File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/comm/tcp.py", line 250, in write
frames = await to_frames(
File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/comm/utils.py", line 50, in to_frames
return _to_frames()
File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/comm/utils.py", line 33, in _to_frames
return list(protocol.dumps(msg, **kwargs))
File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/core.py", line 76, in dumps
frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/msgpack/__init__.py", line 35, in packb
return Packer(**kwargs).pack(o)
File "msgpack/_packer.pyx", line 294, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 300, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 297, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 285, in msgpack._cmsgpack.Packer._pack
File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/core.py", line 57, in _encode_default
sub_header, sub_frames = serialize_and_split(
File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 435, in serialize_and_split
header, frames = serialize(x, serializers, on_error, context)
File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 262, in serialize
return serialize(
File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 308, in serialize
headers_frames = [
File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 309, in <listcomp>
serialize(
File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 359, in serialize
raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type _FillValueMaskCheckAndStoreTarget.', '<iris.fileformats.netcdf._FillValueMaskCheckAndStoreTarget object at 0x7f8254a6a620>')
---------------------------------------------------------------------------
CancelledError Traceback (most recent call last)
<timed exec> in <module>
~/miniconda3/envs/iris/lib/python3.10/site-packages/iris/io/__init__.py in save(source, target, saver, **kwargs)
426 # Single cube?
427 if isinstance(source, Cube):
--> 428 saver(source, target, **kwargs)
429
430 # CubeList or sequence of cubes?
~/miniconda3/envs/iris/lib/python3.10/site-packages/iris/fileformats/netcdf.py in save(cube, filename, netcdf_format, local_keys, unlimited_dimensions, zlib, complevel, shuffle, fletcher32, contiguous, chunksizes, endian, least_significant_digit, packing, fill_value)
2770 # Iterate through the cubelist.
2771 for cube, packspec, fill_value in zip(cubes, packspecs, fill_values):
-> 2772 sman.write(
2773 cube,
2774 local_keys,
~/miniconda3/envs/iris/lib/python3.10/site-packages/iris/fileformats/netcdf.py in write(self, cube, local_keys, unlimited_dimensions, zlib, complevel, shuffle, fletcher32, contiguous, chunksizes, endian, least_significant_digit, packing, fill_value)
1150
1151 # Create the associated cube CF-netCDF data variable.
-> 1152 cf_var_cube = self._create_cf_data_variable(
1153 cube,
1154 dimension_names,
~/miniconda3/envs/iris/lib/python3.10/site-packages/iris/fileformats/netcdf.py in _create_cf_data_variable(self, cube, dimension_names, local_keys, packing, fill_value, **kwargs)
2417
2418 # Store the data and check if it is masked and contains the fill value
-> 2419 is_masked, contains_fill_value = store(
2420 data, cf_var, fill_value_to_check
2421 )
~/miniconda3/envs/iris/lib/python3.10/site-packages/iris/fileformats/netcdf.py in store(data, cf_var, fill_value)
2395 # the fill value
2396 target = _FillValueMaskCheckAndStoreTarget(cf_var, fill_value)
-> 2397 da.store([data], [target])
2398 return target.is_masked, target.contains_value
2399
~/miniconda3/envs/iris/lib/python3.10/site-packages/dask/array/core.py in store(sources, targets, lock, regions, compute, return_stored, **kwargs)
1116 elif compute:
1117 store_dsk = HighLevelGraph(layers, dependencies)
-> 1118 compute_as_if_collection(Array, store_dsk, map_keys, **kwargs)
1119 return None
1120
~/miniconda3/envs/iris/lib/python3.10/site-packages/dask/base.py in compute_as_if_collection(cls, dsk, keys, scheduler, get, **kwargs)
313 schedule = get_scheduler(scheduler=scheduler, cls=cls, get=get)
314 dsk2 = optimization_function(cls)(dsk, keys, **kwargs)
--> 315 return schedule(dsk2, keys, **kwargs)
316
317
~/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/client.py in get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
2689 should_rejoin = False
2690 try:
-> 2691 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
2692 finally:
2693 for f in futures.values():
~/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
1944 else:
1945 local_worker = None
-> 1946 return self.sync(
1947 self._gather,
1948 futures,
~/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/utils.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
308 return future
309 else:
--> 310 return sync(
311 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
312 )
~/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
362 if error[0]:
363 typ, exc, tb = error[0]
--> 364 raise exc.with_traceback(tb)
365 else:
366 return result[0]
~/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/utils.py in f()
347 if callback_timeout is not None:
348 future = asyncio.wait_for(future, callback_timeout)
--> 349 result[0] = yield future
350 except Exception:
351 error[0] = sys.exc_info()
~/miniconda3/envs/iris/lib/python3.10/site-packages/tornado/gen.py in run(self)
760
761 try:
--> 762 value = future.result()
763 except Exception:
764 exc_info = sys.exc_info()
~/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
1810 else:
1811 raise exception.with_traceback(traceback)
-> 1812 raise exc
1813 if errors == "skip":
1814 bad_keys.add(key)
CancelledError: ('store-map-ff2955f4724c217f42a2a75fc58e80e8', 1, 0, 0)
For what it's worth, I can perform the same operations on NetCDF files with Xarray, so it doesn't seem a ridulous thing to do?
from dask.distributed import Client
client = Client(n_workers=4)
import xarray
dataset = xarray.open_dataset('~/environment/data/000490262cdd067721a34112963bcaa2b44860ab.nc').chunk({"latitude": 10})
assert {'realization': 18, 'height': 33, 'latitude': 960, 'longitude': 1280, 'bnds': 2} == dataset.dims.mapping
averages = dataset.mean('realization', keep_attrs=True)
averages.to_netcdf('delme.nc')
Hi @dmcg, thanks for getting in touch. It looks like you're running in an AWS ec2 instance, is that the case?
We discussed this a bit this morning and what you're doing does seem reasonable, and we've got a couple of ideas as to what might be going wrong. We're a bit surprised we don't see this issue more, so it might be that the way things are pickled in the cloud. Can you reproduce the issue on the ground?
Hi, thanks for the response.
This is running in EC2 (Cloud9 fwiw), but my understanding of that Client(n_workers=4)
line is that the Dask ‘cluster’ will just be on the Jupyter machine, maybe even in the same process, so the pickled objects won’t really touch the cloud? Or, put another way, my ground is in the cloud!
I’m out for a couple of days, but happy to help next week, maybe at least check I can reproduce with other NetCDF files. I’ll also speak with my clients about sharing the repo with the data and code.
I’ve given @wjbenfold access to the repo, it seems we’re organisationally proximate!
I've tried a selection of files from https://github.com/SciTools/iris-test-data/tree/master/test_data/NetCDF, but can't find one that will open with iris.load_cube
. If you can point me at a suitable candidate to run against the sample code, or amended code, I can try that in my environment.
Whilst acknowledging that this current doesn't diagnose the actual source of the issue, I'd based on your traceback that you're running with python 3.10, which no version (not even the upcoming Iris 3.2) has been tested against. When running the Iris 3.1 tests in a python 3.10 environment I get a lot of test failures, which could well include this issue.
Is running in an older version of python an option for you? Iris 3.1 was tested in python 3.7, so that's probably the best choice to give a go.
I'll have a chat with miniconda and see what I can do
I've tried again in a fresh and minimal 3.7 environment.
conda create --name iris python=3.7 -y
conda activate iris
conda install distributed iris pytest -c conda-forge -y
Given test_iris_issue.py
import iris
import pytest
from dask.distributed import Client
# Uncomment to see the failure
# client = Client(n_workers=4)
def test_load_and_save():
cube = iris.load_cube('metoffice-data/000490262cdd067721a34112963bcaa2b44860ab.nc')
averages = cube.collapsed('realization', iris.analysis.MEAN)
iris.io.save(averages, "delme.nc")
pytest test_iris_issue.py
succeeds as is, and fails with the _FillValueMaskCheckAndStoreTarget
when the client
line is uncommented.
The environment is still EC2, and this is what is running:
# packages in environment at /home/ec2-user/miniconda3/envs/iris:
#
# Name Version Build Channel
_libgcc_mutex 0.1 main
_openmp_mutex 4.5 1_gnu
antlr-python-runtime 4.7.2 py37h89c1867_1003 conda-forge
attrs 21.4.0 pyhd8ed1ab_0 conda-forge
c-ares 1.18.1 h7f8727e_0
ca-certificates 2021.10.8 ha878542_0 conda-forge
cartopy 0.18.0 py37h0d9ca2b_1
certifi 2021.10.8 py37h89c1867_1 conda-forge
cf-units 3.0.1 py37h6f94858_0 conda-forge
cftime 1.5.1.1 py37hce1f21e_0
click 8.0.3 py37h89c1867_1 conda-forge
cloudpickle 2.0.0 pyhd8ed1ab_0 conda-forge
curl 7.80.0 h7f8727e_0
cycler 0.11.0 pyhd8ed1ab_0 conda-forge
cytoolz 0.11.0 py37h7b6447c_0
dask-core 2022.1.0 pyhd8ed1ab_0 conda-forge
distributed 2022.1.0 py37h89c1867_0 conda-forge
expat 2.2.10 h9c3ff4c_0 conda-forge
freetype 2.10.4 h0708190_1 conda-forge
fsspec 2022.1.0 pyhd8ed1ab_0 conda-forge
geos 3.8.0 he6710b0_0
hdf4 4.2.13 h3ca952b_2
hdf5 1.10.6 nompi_h6a2412b_1114 conda-forge
heapdict 1.0.1 py_0 conda-forge
icu 67.1 he1b5a44_0 conda-forge
importlib-metadata 4.10.1 py37h89c1867_0 conda-forge
importlib_metadata 4.10.1 hd8ed1ab_0 conda-forge
iniconfig 1.1.1 pyh9f0ad1d_0 conda-forge
iris 3.1.0 pyhd8ed1ab_3 conda-forge
jinja2 3.0.3 pyhd8ed1ab_0 conda-forge
jpeg 9d h7f8727e_0
kiwisolver 1.3.1 py37h2531618_0
krb5 1.19.2 hcc1bbae_0 conda-forge
ld_impl_linux-64 2.35.1 h7274673_9
libblas 3.9.0 11_linux64_openblas conda-forge
libcblas 3.9.0 11_linux64_openblas conda-forge
libcurl 7.80.0 h0b77cf5_0
libedit 3.1.20191231 he28a2e2_2 conda-forge
libev 4.33 h516909a_1 conda-forge
libffi 3.3 he6710b0_2
libgcc-ng 9.3.0 h5101ec6_17
libgfortran-ng 11.2.0 h69a702a_12 conda-forge
libgfortran5 11.2.0 h5c6108e_12 conda-forge
libgomp 9.3.0 h5101ec6_17
liblapack 3.9.0 11_linux64_openblas conda-forge
libnetcdf 4.6.1 h2053bdc_4
libnghttp2 1.46.0 hce63b2e_0
libopenblas 0.3.17 pthreads_h8fe5266_1 conda-forge
libpng 1.6.37 h21135ba_2 conda-forge
libssh2 1.9.0 h1ba5d50_1
libstdcxx-ng 9.3.0 hd4cf53a_17
locket 0.2.0 py_2 conda-forge
markupsafe 2.0.1 py37h5e8e339_0 conda-forge
matplotlib-base 3.2.2 py37h1d35a4c_1 conda-forge
msgpack-python 1.0.2 py37hff7bd54_1
ncurses 6.3 h7f8727e_2
netcdf4 1.5.7 py37h0a24e14_0
numpy 1.20.3 py37h038b26d_1 conda-forge
openssl 1.1.1m h7f8727e_0
packaging 21.3 pyhd8ed1ab_0 conda-forge
partd 1.2.0 pyhd8ed1ab_0 conda-forge
pip 21.2.2 py37h06a4308_0
pluggy 1.0.0 py37h89c1867_2 conda-forge
proj 6.2.1 hc80f0dc_0 conda-forge
psutil 5.8.0 py37h27cfd23_1
py 1.11.0 pyh6c4a22f_0 conda-forge
pyparsing 3.0.7 pyhd8ed1ab_0 conda-forge
pyshp 2.1.3 pyh44b312d_0 conda-forge
pytest 6.2.5 py37h89c1867_2 conda-forge
python 3.7.11 h12debd9_0
python-dateutil 2.8.2 pyhd8ed1ab_0 conda-forge
python-xxhash 2.0.2 py37h5e8e339_0 conda-forge
python_abi 3.7 2_cp37m conda-forge
pyyaml 5.4.1 py37h5e8e339_0 conda-forge
readline 8.1.2 h7f8727e_1
scipy 1.5.3 py37h14a347d_0 conda-forge
setuptools 58.0.4 py37h06a4308_0
shapely 1.7.1 py37h1728cc4_0
six 1.16.0 pyh6c4a22f_0 conda-forge
sortedcontainers 2.4.0 pyhd8ed1ab_0 conda-forge
sqlite 3.37.0 hc218d9a_0
tblib 1.7.0 pyhd8ed1ab_0 conda-forge
tk 8.6.11 h1ccaba5_0
toml 0.10.2 pyhd8ed1ab_0 conda-forge
toolz 0.11.2 pyhd8ed1ab_0 conda-forge
tornado 6.1 py37h5e8e339_1 conda-forge
typing_extensions 4.0.1 pyha770c72_0 conda-forge
udunits2 2.2.27.27 h360fe7b_0 conda-forge
wheel 0.37.1 pyhd3eb1b0_0
xxhash 0.8.0 h7f98852_3 conda-forge
xz 5.2.5 h7b6447c_0
yaml 0.2.5 h516909a_0 conda-forge
zict 2.0.0 py_0 conda-forge
zipp 3.7.0 pyhd8ed1ab_0 conda-forge
zlib 1.2.11 h7f8727e_4
I'll try to reproduce outside EC2
We get the same issue on MacOS and a different MOGREPS-G NetCDF file.
I have local reproduction! For future reference, I had to protect the Client() call with an if __name__ == "__main__":
from dask.distributed import Client
import iris
def main():
client = Client()
filename = iris.sample_data_path("A1B_north_america.nc")
# cube = iris.load_cube('~/environment/data/000490262cdd067721a34112963bcaa2b44860ab.nc')
cube = iris.load_cube(filename)
# assert cube.shape == (18, 33, 960, 1280)
averages = cube.collapsed('latitude', iris.analysis.MEAN)
# assert type(averages) == iris.cube.Cube
iris.save(averages, "delme.nc")
if __name__ == "__main__":
main()
More things I've worked out:
- If I remove the collapse stage, I still see a serialization error (albeit a different one)
- Both failures seem to be due to the use of
msgpack
to serialize Iris objects. This seems to be a widespread problem in Iris (just straight import ofmsgpack
then trying to serialize aCube
hits an error). - Dask workers seem to use
msgpack
regardless of the client being started with specified serializers that don't include it.
This doesn't seem quick to solve, though I'm no expert on serialization / dask so there might be approaches I've not spotted.
Hi @dmcg, thanks for raising this issue with Iris! As per @wjbenfold, I've also been able to locally reproduce this error, again with the A1B_north_america.nc
file. Hopefully I'll also be able to add some more context to the error, a workaround you can use immediately, and a suggestion for how we can fix Iris to stop this happening in general.
I'll start with the workaround, as it has the most immediate value. If you don't use distributed (i.e. don't create a Client
), you won't get the error (the reason for this will hopefully become clear in the context). If you just use plain Iris you will automatically get dask local parallel processing, so you should see the same parallel performance for your collapse operation with or without distributed. As you're using a single EC2 (I think) you won't lose performance for distributing your processing over multiple machines - although obviously this workaround won't scale if you do move to a cluster of machines.
So, the following code should run without error:
import iris
cube = iris.load_cube('~/environment/data/000490262cdd067721a34112963bcaa2b44860ab.nc')
averages = cube.collapsed('realization', iris.analysis.MEAN)
iris.save(averages, "delme.nc")
Your other option is to realise the data before you save it - that is, load the data into memory after you collapse but before you save. Note this will only work if there's enough memory on the machine to store the collapsed cube's data. For example:
import iris
from distributed import Client
client = Client(n_workers=4)
cube = iris.load_cube('~/environment/data/000490262cdd067721a34112963bcaa2b44860ab.nc')
averages = cube.collapsed('realization', iris.analysis.MEAN)
averages.data # <-- This line will load the data and force processing of the collapse operation before you save.
iris.save(averages, "delme.nc")
The reason for the difference between using distributed and not is that distributed communications always run over network links - even when client, scheduler and workers are all on the same machine. There are certainly some advantages to using a local cluster over dask multiprocessing (and it's the preferred solution in the dask docs), but it can be more unreliable.
One example of this is that the network communications cause extra constraints on how data is moved between workers. By default a distributed network runs over TCP, which transmits frames of bytes. Python objects in memory or on disk must be translated to bytes before being transmitted, and translated back from bytes on receipt. These processes are serialization and deserialization respectively, and it's this step that's failing here. The way that Python objects are serialized for TCP transmission is by first being pickled, and apparently the problem class here is one that cannot be pickled. Taking a look through the Iris source code it looks like the problem class is only used for NetCDF save, so the scope of the problem is small - but still annoying if you can't save your cube!
I think the longer-term solution to this will be to make the problem class serializable. This should be achievable by overriding the class's __getstate__
and __setstate__
methods - the trick will be ensuring that the correct state is captured by these methods. I'll have a go...
Thanks for this. Loading into local memory should be possible for me, so happy to have this as just an irrititating edge-case.
Also experience this issue when using SPICE which involves saving a cube within the process 👍
#5031 may be relevant here.
#5031 may be relevant here.
Indeed.
Can you please explain if this is still an issue for you @dmcg ? -- as all this discussion is a while ago now.
I think my experiences with #5031 may cast some light on how to do this.
Also, when that is available, it would probably be a better way of doing this anyway -- but work on that is still incomplete.
So I think it would be useful to have a re-statement of whether you still have a need + if so what needs to be achieved.
Has this been fixed by #5191?