cfgrib
cfgrib copied to clipboard
Performance of GRIB files read with dask and dask.distributed can be improved.
Support for dask and dask.distributed is currently quite good from the point of view of the correctness. We are able to perform complex operations on 320Gb of data from 20 files on a 10 VM x 8 core dask.distributed cluster with no problem.
Performance is not stellar and more work is needed in this respect.
I don't know whether this could be flagged as issue but I've noticed some weird behaviour trying to use cfgrib
as xarray
engine.
I'm running some benchmarks on GRIB data containing output of global models with different resolutions (ranging from 80 km to 2.5 km) written with compression. The shape of the array that I'm trying to process ranges within (~300 timesteps, ~80 levels, ~8e5-80e6 points).
My initial idea was to speed up the processing of data by parallelising instead than using CDO
which still works relatively well on our machine. For the 80km and 40km everythin works pretty well and the performance is actually not bad! Here a comparison of the serial and parallel approaches.
Serial (not parallel) approach¶
import xarray as xr
%%time
dss = xr.open_mfdataset('/work/ka1081/DYAMOND/ICON-80km/nwp_R2B05_lkm1008_atm_3d_tot_qc_dia_ml_*.grb', engine='cfgrib')
%%time
dss.unknown.mean(axis=2).compute()
Parallel Approach¶
import json
data=[]
with open('/work/mh0731/m300382/dask/scheduler.json') as f:
data = json.load(f)
scheduler_address=data['address']
from dask.distributed import Client, progress
client = Client(scheduler_address)
client
import xarray as xr
%%time
dss = xr.open_mfdataset('/work/ka1081/DYAMOND/ICON-80km/nwp_R2B05_lkm1008_atm_3d_tot_qc_dia_ml_*.grb', engine='cfgrib', parallel=True)
%%time
dss.unknown.mean(axis=2).compute()
However, when trying to process in the same way data from the 20 km resolution model, which should still be quite lightweight, the computation hangs and some messages are printed in the logs of dask workers.
15: distributed.core - INFO - Event loop was unresponsive in Worker for 26.74s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
63: distributed.core - INFO - Event loop was unresponsive in Worker for 26.60s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
This causes also the scheduler to hangs and crash. I'm wondering whether it has to do with the fact that data is also compressed, and that maybe decompressing and parallelizing at the same time is just too much, or whether the dask
support may still not be completely functional.
I'm testing this on our cluster where dask
, eccodes
and cfgrib
were all installed in a conda environment to minimise dependencies issues. In the past I've succesfully processed large NETCDF datasets (order of ~1000 timesteps, 20e6 points).
@guidocioni I didn't know the parallel
keyword argument, but it looks like it only affect the multi-file preprocessing step.
In order to enable chunked (thus parallel) processing in dask
you need to supply the chinks
keyword argument. Try opening the dataset with:
dss = xr.open_mfdataset('files*.grb', engine='cfgrib', chunks={'time': 1, 'generalVerticalLayer': 1}, parallel=True)
@guidocioni I didn't know the
parallel
keyword argument, but it looks like it only affect the multi-file preprocessing step.In order to enable chunked (thus parallel) processing in
dask
you need to supply thechinks
keyword argument. Try opening the dataset with:dss = xr.open_mfdataset('files*.grb', engine='cfgrib', chunks={'time': 1, 'generalVerticalLayer': 1}, parallel=True)
Yes, the parallel
keyword argument only affects the preprocessing step, which is faster. However, the open_mfdataset
function always returns a dataset composed by dask arrays (see here http://xarray.pydata.org/en/stable/generated/xarray.open_mfdataset.html). Chunks are determined automatically based on the dimensions. Thus, the computation is always parallelized depending on the scheduler used when adopting open_mfdataset
. I think using lazy arrays prevents xarray
from loading big dataset directly into memory.
Furthermore I've also used xr.open_dataset
manually defining the the chunks and I still have the same behaviour.
My guess is that for the 20km you have 321 files with one time
per file and approx 4Gb each for a total of 1.3Tb. If this is the case the automatic chunking would be equivalent to chunks={'time': 1}
and processing 4Gb of data per chunk may be too big for one core of your (otherwise massive!) dask cluster.
Is it a public dataset that I can see? How many files do you have? What size is every file? How are they structured?
For the 20km-resolution model 3D variables are written in GRIB2 aec
compressed files containing 8 timesteps (1 day every 3 hours), 77 vertical levels and exactly 1310720 horizontal points (grid is unstructured). They are about 1 GB in size and there are 41 of them.
It would be really nice to share these files to test the cfgrib
and dask
capabilities but unfortunately they are not public but live on our cluster. Still, I could transfer 40 GB of data somewhere if you're interested to test them. Moving data from the 10, 5 and 2.5 km resolution domain will be a little bit more complicated for obvious reasons :)
It looks like it is a memory issue... By refining the chunks I was able to run the computation for a while before the job crashed due to limited memory.
Maybe I just have to refine the way I ask for the resources on the cluster size. Thus, I don't think this is strictly cfgrib
related but mostly on the dask
-infrastructure side.
Small update..apparently using the same approach with netcdf
and grib
files produce different results. This what I usually do after having launched a connected to a dask
scheduler which controls 15 different workers (for a total of 16 nodes on the machine with 72 cores each).
import xarray as xr
from glob import glob
import dask.array as da
chunk_space=1114111
chunk_time=50
main_folder='/work/bm0834/k203095/ICON_LEM_DE/'
file_prefix='2d_cloud_day'
domain='DOM01'
folders=['20130620-default-ifs']
variable='rain_gsp_rate'
filenames=[]
for folder in folders:
filenames=filenames+(sorted(glob(main_folder+folder+'/DATA/'+file_prefix+'_'+domain+'*')))
temps=[xr.open_dataset(fn).variables[variable] for fn in filenames]
arrays=[da.from_array(t, chunks=(chunk_time,chunk_space)) for t in temps]
var_normal=da.concatenate(arrays, axis=0)
mean=var_normal.mean(axis=1)
mean=mean.persist()
This completes in about 1 minute since the dataset is not that big (1080 timesteps, 2323968 points equals to about total 2.5e9 individual values).
However if I just try to use the same approach with cfgrib
and files from the other simulation then workers start to crash and I cannot complete the computation. This is the second approach
chunk_space=1000000
chunk_levels=77
chunk_time=10
main_folder='/work/ka1081/DYAMOND/ICON-80km/'
variables=['u']
filenames=[]
for variable in variables:
file_prefix='nwp_R2B05_lkm1008_atm_3d_'+variable+'_ml_'
filenames=filenames+sorted(glob(main_folder+file_prefix+'*.grb'))
temps=[xr.open_dataset(fn, engine='cfgrib').variables[variable] for fn in filenames[0:-2]]
arrays=[da.from_array(t, chunks=(chunk_time,chunk_levels,chunk_space)) for t in temps]
var_normal=da.concatenate(arrays, axis=0)
mean = var_normal.mean(axis=2)
mean = mean.persist()
and I'm using exactly the same session with the same dask
schedulers and workers. Note that in this case the total size of the data should be even smaller as I have dimensions (312 timesteps, 77 levels, 81920 points) equals to about 2e9 individual values.
Honestly I don't know where the problem is...with this dataset the individual chunks of data should be so small not to create any problem.
I'm attaching the LOG file from the dask
scheduler and workers.
The log file contains the following errors from ecCodes:
ECCODES ERROR : grib_handle_create: cannot create handle, no definitions found
ecCodes assertion failed: `h' in .../src/grib_query.c:458
which is probably what causes the problems, but also:
No handlers could be found for logger "cfgrib.messages"
so we don't see the messages from cfgrib itself.
I'm not sure how to enable cfgrib logging in dask workers so I cannot debug more that this at the moment.
Anyway, I'll be working on performance and especially performance under dask.distributed early next year.
Hi @alexamici I would just like to add a data point regarding performance.
I am trying to open a 320mb file on a HPC system where bandwidth is not an issue for this size of file. Forming the index takes a really long time considering the size of the file. Time scale is in mins not seconds 7mins to open a 320mb file. It's much slower that nio which also has to create an index from nothing. In my use case, pre-generating index files is not really possible and anyway, it would still be computationally expensive to do it just once. It's a heterogeneous file with many variables and many index keys.
It seems like the specific getitem calls are a large part of the expense of the indexing cost. Perhaps there is some gains to be made here before diving into the Dask optimization. See the profile (incidentally from Dask) below. the largest chunk of index is getitem.
I am currently using the cfgrib.open_datasets interface and doing the Dask multi file stuff myself because I have heterogeneous files. I thought before profiling that it was the merging and sorting costing the time, but it's really just indexing the files.
Regarding dask optimization. My experience is that creating single processor clients yields the best performance. This is probably due to the classic python not releasing the GIL multithreading issue. A solution to that would also make life a lot easier down the road when you want to use the data and workers for a multithreaded process, like training a neural net. Perhaps this is helpful?
Thanks for taking the time to read. :) I would love to hear any suggestions to improve the performance from the outside or alternatively I would also be very excited to hear if you managed to find a way of getting some improvements.
Chiming in again as I gave another go to dask these days :)
Just to test I wanted to compute yearly averages from era5 monthly data. I have 2 grib files with different time periods downloaded from the Copernicus hub.
import xarray as xr
from dask.distributed import Client
client = Client(n_workers=4)
ds = xr.open_mfdataset('Downloads/adaptor.mars.internal-*.grib',
chunks={"time": 10},
parallel=True)

results = ds.resample({'time':'1Y'}).mean().compute()
Takes about 40 seconds and the average CPU usage (I have 6 core i7-8700) sits at about 60-70%.
The same with cdo
(base) guidocioni@Guidos-iMac:~/Downloads$ cdo yearavg -mergetime adaptor.mars.internal-*.grib test.grib
cdo(1) mergetime: Process started
cgribexGetTsteptype: Time range indicator 133 unsupported, set to 0!
cdo(1) mergetime: Processed 5601830400 values from 2 variables over 864 timesteps.
cdo yearavg: Processed 5601830400 values from 1 variable over 935 timesteps [36.85s 262MB].
takes about 36 seconds with a CPU usage of just 10%.
Is this huge difference in performance really related only to the fact that CDO is highly optimized and written directly in C?
To be fair, I think xarray
, dask
and cfgrib
made a lot of progress as many years ago one could barely open and concatenate files without getting errors...:D But for me it is still far from being optimal and I'm wondering whether I'm doing something wrong.