ecmwf-opendata
ecmwf-opendata copied to clipboard
possible data corruption when retrieving data for a large number of steps
I think there might be a problem with the data that is downloaded when a large number of steps are passed.
In this example I am passing all steps from 0h to 240h for all ensemble numbers from 1 to 50 for the parameters 10u
and 10v
.
I am opening the file with xarray, filter the array by a random ensemble number and loop through every single step printing all the values for one of the variables.
The code will fail with the error reported below. However if I try divide the list of steps in two halves and process them separately the code will run correctly which makes me think there might be some issues with the data when retrieving a large number of steps in one go.
Can you replicate this behaviour?
import sys
np.set_printoptions(threshold=sys.maxsize)
from ecmwf.opendata import Client
def get_ECMWF_open_data_grib_ensemble(target, stream, step, param, number):
try:
client = Client(source="ecmwf")
downloaded_file = client.retrieve(
# time=0,
target = target,
stream=stream,
# type=type,
step=step,
param=param,
number=number,
)
return downloaded_file
except:
print('The ECMWF service is not currently available, please try again later')
grib_stream = 'enfo'
# grib_type = 'fc'
grib_format = 'grib2'
grib_target = f'aaa_{grib_stream}.{grib_format}'
step = [0, 3, 6, 9, 12, 15, 18, 21, 24, 27, 30, 33, 36, 39, 42, 45, 48, 51, 54, 57, 60, 63, 66, 69, 72, 75, 78, 81, 84, 87, 90, 93, 96, 99, 102, 105, 108, 111, 114, 117, 120, 123, 126, 129, 132, 135, 138, 141, 144, 150, 156, 162, 168, 174, 180, 186, 192, 198, 204, 210, 216, 222, 228, 234, 240]
grib_param = ["10u", "10v"]
grib_number = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50]
grib_file = get_ECMWF_open_data_grib_ensemble(grib_target, grib_stream, step, grib_param, grib_number)
ds = xr.open_mfdataset(grib_target, engine='cfgrib', parallel=True, chunks={'step': 3, 'number': 3,},
backend_kwargs={'filter_by_keys': {'typeOfLevel': 'heightAboveGround', 'topLevel': 10}, })
print(ds)
ensemble = 11 #21
mask = (ds.number.values == ensemble)
xarr = ds.sel(number=mask).squeeze(drop=True)
print('xarr')
print(xarr)
print(xarr.coords['step'].values)
for number in step:
# number = 0
h = str(number) + 'hours'
timedelta = pd.Timedelta(h)
res = timedelta.to_timedelta64()
print(res)
new_da = xarr.where(xarr.step == res, drop=True)
print('new_da')
print(new_da)
print(new_da['u10'].values)
print(new_da['u10'].size)
error:
Traceback (most recent call last):
File "C:\Users\Giacomo\PycharmProjects\HIT-v3\test.py", line 378, in <module>
print(new_da['u10'].values)
File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\xarray\core\dataarray.py", line 732, in values
return self.variable.values
File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\xarray\core\variable.py", line 614, in values
return _as_array_or_item(self._data)
File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\xarray\core\variable.py", line 314, in _as_array_or_item
data = np.asarray(data)
File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\dask\array\core.py", line 1701, in __array__
x = self.compute()
File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\dask\base.py", line 310, in compute
(result,) = compute(self, traverse=False, **kwargs)
File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\dask\base.py", line 595, in compute
results = schedule(dsk, keys, **kwargs)
File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\dask\threaded.py", line 89, in get
results = get_async(
File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\dask\local.py", line 511, in get_async
raise_exception(exc, tb)
File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\dask\local.py", line 319, in reraise
raise exc
File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\dask\local.py", line 224, in execute_task
result = _execute_task(task, data)
File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\dask\core.py", line 121, in _execute_task
return func(*(_execute_task(a, cache) for a in args))
File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\dask\core.py", line 121, in <genexpr>
return func(*(_execute_task(a, cache) for a in args))
File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\dask\core.py", line 121, in _execute_task
return func(*(_execute_task(a, cache) for a in args))
File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\dask\core.py", line 121, in <genexpr>
return func(*(_execute_task(a, cache) for a in args))
File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\dask\core.py", line 121, in _execute_task
return func(*(_execute_task(a, cache) for a in args))
File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\dask\core.py", line 121, in <genexpr>
return func(*(_execute_task(a, cache) for a in args))
File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\dask\core.py", line 121, in _execute_task
return func(*(_execute_task(a, cache) for a in args))
File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\dask\optimization.py", line 992, in __call__
return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\dask\core.py", line 151, in get
result = _execute_task(task, cache)
File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\dask\core.py", line 121, in _execute_task
return func(*(_execute_task(a, cache) for a in args))
File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\dask\array\core.py", line 126, in getter
c = np.asarray(c)
File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\xarray\core\indexing.py", line 484, in __array__
return np.asarray(self.get_duck_array(), dtype=dtype)
File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\xarray\core\indexing.py", line 487, in get_duck_array
return self.array.get_duck_array()
File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\xarray\core\indexing.py", line 664, in get_duck_array
return self.array.get_duck_array()
File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\xarray\core\indexing.py", line 551, in get_duck_array
array = self.array[self.key]
File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\cfgrib\xarray_plugin.py", line 155, in __getitem__
return xr.core.indexing.explicit_indexing_adapter(
File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\xarray\core\indexing.py", line 858, in explicit_indexing_adapter
result = raw_indexing_method(raw_key.tuple)
File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\cfgrib\xarray_plugin.py", line 164, in _getitem
return self.array[key]
File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\cfgrib\dataset.py", line 358, in __getitem__
message = self.index.get_field(message_ids[0]) # type: ignore
File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\cfgrib\messages.py", line 484, in get_field
return ComputedKeysAdapter(self.fieldset[message_id], self.computed_keys)
File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\cfgrib\messages.py", line 344, in __getitem__
return self.message_from_file(file, offset=item)
File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\cfgrib\messages.py", line 340, in message_from_file
return Message.from_file(file, offset, **kwargs)
File "C:\Users\Giacomo\anaconda3\envs\HIT-v3\lib\site-packages\cfgrib\messages.py", line 93, in from_file
file.seek(offset)
OSError: [Errno 22] Invalid argument
It is possible that the data returned is incomplete and causes an error. Perhaps test small subsets of the steps, explore the data, then proceed?