pangeo-forge-recipes
pangeo-forge-recipes copied to clipboard
Grib / Kerchunk recipe?
The new ECMWF forecast dataset in Azure is a perfect chance for us develop a close but not quite functional type of recipe: indexing and combing a bunch of grib files with Kerchunk.
The current HDFReferenceRecipe class almost does what we need; it just needs to be generalized to permit grib inputs. The kerchunk HRRR example shows how to open grib with kerchunk.
Here is some code to kick things off. cc @cisaacstern, @lsterzinger, @martindurant, @TomAugspurger
from kerchunk.grib2 import scan_grib
from fsspec.implementations.reference import ReferenceFileSystem
import zarr
import xarray as xr
ROOT = 'https://ai4edataeuwest.blob.core.windows.net/ecmwf/'
path = '20220122/18z/0p4-beta/scda/20220122180000-36h-scda-fc.grib2'
url = ROOT + path
storage_options = {}
common = ['time', 'latitude', 'longitude']
%time scan = scan_grib(url, common, storage_options)
rfs = ReferenceFileSystem(scan)
zg = zarr.open_group(rfs.get_mapper(""))
zg.info
%time _ = zg['q'][:]
import xarray as xr
ds = xr.open_zarr(rfs.get_mapper(""), consolidated=False)
print(ds)
<xarray.Dataset>
Dimensions: (latitude: 451, longitude: 900)
Coordinates:
* latitude (latitude) float64 90.0 89.6 89.2 88.8 ... -89.2 -89.6 -90.0
* longitude (longitude) float64 -180.0 -179.6 -179.2 ... 178.8 179.2 179.6
Data variables: (12/18)
d (latitude, longitude) float32 dask.array<chunksize=(451, 900), meta=np.ndarray>
gh (latitude, longitude) float32 dask.array<chunksize=(451, 900), meta=np.ndarray>
msl (latitude, longitude) float32 dask.array<chunksize=(451, 900), meta=np.ndarray>
q (latitude, longitude) float32 dask.array<chunksize=(451, 900), meta=np.ndarray>
r (latitude, longitude) float32 dask.array<chunksize=(451, 900), meta=np.ndarray>
skt (latitude, longitude) float32 dask.array<chunksize=(451, 900), meta=np.ndarray>
... ...
u (latitude, longitude) float32 dask.array<chunksize=(451, 900), meta=np.ndarray>
u10 (latitude, longitude) float32 dask.array<chunksize=(451, 900), meta=np.ndarray>
unknown (latitude, longitude) float32 dask.array<chunksize=(451, 900), meta=np.ndarray>
v (latitude, longitude) float32 dask.array<chunksize=(451, 900), meta=np.ndarray>
v10 (latitude, longitude) float32 dask.array<chunksize=(451, 900), meta=np.ndarray>
vo (latitude, longitude) float32 dask.array<chunksize=(451, 900), meta=np.ndarray>
Attributes:
Conventions: CF-1.7
GRIB_centre: ecmf
GRIB_centreDescription: European Centre for Medium-Range Weather Forecasts
GRIB_edition: 2
GRIB_subCentre: 0
history: 2022-01-26T04:08 GRIB to CDM+CF via cfgrib-0.9.9...
institution: European Centre for Medium-Range Weather Forecasts
Just an FYI, that storage container currently has a lifecycle policy to remove files older than 30-ish days. We're still trying to figure out how best to handle these operational forecast datasets. If anyone has thoughts on that, I'd be happy to hear them.
I would just like to mention that GRIB2 support is not as good as hdf: it still needs cfgrib installed and works by copying pieces of the target file to local temporary files. It should be possible to eliminate that and get direct access to the binary chunks, and so better performance, but I have not had the time to try myself and was hoping someone would emerge with grib expertise who can help ( @DPeterK :) ).
Perhaps we want to define a recipe that will create a kerchunk-virtual-zarr that always points to the "latest" data, whatever that means. Kind of similar to the ideas explored in this met office blog post. By running this recipe daily, we will ensure that we are never pointing at data that doesn't exist.
@martindurant - the code above worked quite well out of the box! It took about 20s to index the file, and I didn't even run it from inside Azure. Whether or not a local copies are happening may not be so important here.
Good to know!
@martindurant this ECMWF dataset includes "index files". Has anyone checked if those have sufficient information to translate into the index filesystem format needed for a reference filesystem?
https://ai4edataeuwest.blob.core.windows.net/ecmwf/20220126/00z/0p4-beta/enfo/20220126000000-0h-enfo-ef.index is one example, for the file at https://ai4edataeuwest.blob.core.windows.net/ecmwf/20220126/00z/0p4-beta/enfo/20220126000000-0h-enfo-ef.grib2
Here is what the index file looks like
{"domain": "g", "date": "20220126", "time": "0000", "expver": "0001", "class": "od", "type": "pf", "stream": "enfo", "step": "0", "levtype": "sfc", "number": "29", "param": "2t", "_offset": 0, "_length": 609069}
{"domain": "g", "date": "20220126", "time": "0000", "expver": "0001", "class": "od", "type": "pf", "stream": "enfo", "step": "0", "levtype": "sfc", "number": "27", "param": "10u", "_offset": 609069, "_length": 609069}
{"domain": "g", "date": "20220126", "time": "0000", "expver": "0001", "class": "od", "type": "pf", "stream": "enfo", "step": "0", "levtype": "sfc", "number": "14", "param": "10v", "_offset": 1218138, "_length": 609069}
{"domain": "g", "date": "20220126", "time": "0000", "expver": "0001", "class": "od", "type": "pf", "stream": "enfo", "step": "0", "levtype": "sfc", "number": "2", "param": "2t", "_offset": 1827207, "_length": 609069}
{"domain": "g", "date": "20220126", "time": "0000", "expver": "0001", "class": "od", "type": "pf", "stream": "enfo", "step": "0", "levtype": "sfc", "number": "43", "param": "10u", "_offset": 2436276, "_length": 609069}
...
4182 lines of that.
It definitely looks like the information is there. Sounds like we need a kerchunk scan_grib_index
function!
If those "offsets" are what I think they are (will see), then this might indeed be the case that we don't need cfgrib. Otherwise, the JSON looks just a little cryptic.
This is fantastic... I tried this out with two of NOAA's operational datasets archived on GCS and it worked without any modifications, but was much slower (for calibration, the MWE here took ~40 seconds for scan_grib
to run, unclear to me if this is a personal internet bandwidth issue or a GCS issue).
HRRR - https://console.cloud.google.com/storage/browser/high-resolution-rapid-refresh/ - took ~5 minutes for scan_grib
to run
GFS - https://console.cloud.google.com/storage/browser/global-forecast-system/ - took ~7 minutes for scan_grib
to run, but there was some odd bug in decoding the longitude coordinate and one of the values wound up as a NaN.
@TomAugspurger I was just about to call out the index files, these should be available for most NWP datasets distributed via GRIB (they are for NOAA). There's a good bit of information on leveraging systems that support reading via random access to subset byteranges corresponding to data fields in the GRIB using the offsets present in the index - see https://nomads.ncep.noaa.gov/txt_descriptions/fast_downloading_grib.shtml
From the docs @TomAugspurger linked: "In addition, the keys _offset and _length represent the byte offset and length of the corresponding field. This allows the download of a single field using the HTTP Byte-Range request" so this definitely seems workable
Thanks all. I'll spend a bit of time this morning seeing if I can wire something up between the index files and fsspec reference filesystem.
then this might indeed be the case that we don't need cfgrib
even with the index file, it seems like there must be some metadata in the grib file that is not covered by the index file.
As far as I can tell, these "fields" will still need decoding. Maybe those other details in each JSON line tells you the dtype and compression.
Tag @blaylockbk - his package Herbie has some nascent capability to do partial decoding of HRRR grib files, could be a starting point here.
Somehow the grib_ls
tool that ships with eccodes generates a much more detailed catalog of GRIB file contents, but I'm not sure how it works. That's another avenue to potentially explore.
If only, instead or providing little tools with arcane CLI options, a good description of how they worked were given. These are all perl?
Not sure, I'm digging for the source code to read through right now.
I spent the last hour digging through the source code for the grib_ls
tool... it's a pretty dense C program which directly scans through the GRIB file(s) that are passed to it. Not immediately sure how useful it would be here, but for posterity and in case anyone else wants to dig through it as an educational reference, there are three main source files that comprise it:
Presumably there's enough information in the GRIB files themselves to generate the JSON-style index that @rabernat listed. grib_ls
doesn't use the static index file distributed with the GRIB files themselves.
cfgrib
has the concept of a "fieldset", which IIUC is a list of messages, that can be passed to xarray.open_dataset: https://github.com/ecmwf/cfgrib/blob/master/ADVANCED_USAGE.rst
This lets us avoid writing data to disk:
idx_url = "https://ai4edataeuwest.blob.core.windows.net/ecmwf/20220126/00z/0p4-beta/enfo/20220126000000-0h-enfo-ef.index"
grb_url = "https://ai4edataeuwest.blob.core.windows.net/ecmwf/20220126/00z/0p4-beta/enfo/20220126000000-0h-enfo-ef.grib2"
r = requests.get(idx_url)
idxs = [
json.loads(x) for x in r.text.split("\n") if x
]
idx = idxs[0]
start_bytes = idx["_offset"]
end_bytes = start_bytes + idx["_length"] - 1
headers = dict(Range=f"bytes={start_bytes}-{end_bytes}")
# partial data I/O required for the message
r_data = requests.get(grb_url, headers=headers)
r_data.raise_for_status()
data = r_data.content
h = codes_new_from_message(data)
message = cfgrib.messages.Message(h)
ds = xr.open_dataset([message], engine="cfgrib")
print(ds)
That said, we do still need to do the range request to read message to get the structure of the Dataset. So these index files might help with speeding up scan_grib
from kerchunk, but I think we'll still need to extract additional information beyond what's available through the URL and index file.
So perhaps we want to add an optional index_file
argument to scan_grib
, which would provide a fast path to constructing the references.
Agreed. And https://github.com/fsspec/kerchunk/blob/ca4acff173e5eac60e51f50c99011b7afde5ee24/kerchunk/grib2.py#L162 could perhaps be updated to avoid the tempfile (which I think is needed when reading right now).
Here's a little proof of concept for accessing GRIB2 data over the network, using the newly released cfgrib 0.9.10.0. This is just at the Zarr array level. But plugging it into xarray shouldn't be much more work.
import base64
import numcodecs
import eccodes
import xarray as xr
import requests
import cfgrib
import numpy as np
import json
grib_url = "https://ai4edataeuwest.blob.core.windows.net/ecmwf/20220126/00z/0p4-beta/enfo/20220126000000-0h-enfo-ef.grib2"
sub_indices = [{'domain': 'g',
'date': '20220126',
'time': '0000',
'expver': '0001',
'class': 'od',
'type': 'cf',
'stream': 'enfo',
'step': '0',
'levtype': 'sfc',
'param': 'st',
'_offset': 916472742,
'_length': 609069}]
class COGRIB2Codec(numcodecs.abc.Codec):
codec_id = "co-grib2"
def __init__(self, grib_url):
self.grib_url = grib_url
def encode(self, buf):
raise NotImplementedError
def decode(self, buf, out=None):
index = json.loads(buf)
result = data_from_index(self.grib_url, index)
return result
def data_from_index(grib_url: str, idx) -> np.ndarray:
# TODO: think about batching HTTP requests.
# Unclear how that fits in a regular __getitem__ API.
# Might be doable with a getitems.
# See if we can cut down on the number of HTTP requests.
start_bytes = idx["_offset"]
end_bytes = start_bytes + idx["_length"] - 1
headers = dict(Range=f"bytes={start_bytes}-{end_bytes}")
# TODO: sans-io
r = requests.get(grib_url, headers=headers)
r.raise_for_status()
data = r.content
h = eccodes.codes_new_from_message(data)
messages = [cfgrib.messages.Message(h)]
ds = xr.open_dataset(messages, engine="cfgrib")
# Get just the data... This is a bit wasteful. See if cfgrib
# can fetch just the data section of the message.
return ds[idx["param"]].data
numcodecs.register_codec(COGRIB2Codec)
import zarr
import numcodecs
import numpy as np
filters = [COGRIB2Codec(grib_url)]
x = zarr.empty((451, 900), chunks=(451, 900), dtype=np.dtype("float32"), compressor=None, filters=filters)
# we do this write outside of Zarr
x.store["0.0"] = json.dumps(sub_indices[0]).encode()
x[:]
Which outputs
array([[246.57616, 246.57616, 246.57616, ..., 246.57616, 246.57616,
246.57616],
[246.20116, 246.20116, 246.20116, ..., 246.2324 , 246.20116,
246.20116],
[245.76366, 245.76366, 245.76366, ..., 245.7949 , 245.7949 ,
245.76366],
...,
[231.63866, 231.63866, 231.63866, ..., 231.5449 , 231.57616,
231.6074 ],
[232.32616, 232.3574 , 232.3574 , ..., 232.2949 , 232.32616,
232.32616],
[232.26366, 232.26366, 232.26366, ..., 232.26366, 232.26366,
232.26366]], dtype=float32)
The Zarr array contains just the metadata (dtype, shape). The "data" is the relevant line from the index file as a bytestring.
Then we have a custom filter that makes the byte-range HTTP request for the data when requested. This is probably misusing Zarr a bit, but it seems to work.
I'll clean this up a bit and then see about where it belongs.
Tom, great example of Zarr hacking!
I agree you are abusing Zarr filters though. 😬
Here is a related approach I tried for a different a custom binary file format: https://github.com/rabernat/mds2zarr/blob/main/mds2zarr/mds2zarr.py. I created a mutable mapping that "looked like" a kerchunk static json but is actually generating the references on the fly. Then I could use it as an input to ReferenceFileSystem
(see usage in test). That is nice because you can avoid hard-coding the actual reading part (calling requests
) and leverage the rest of fsspec for that part. So this works equally well with with files, s3, http, etc.
a mutable mapping that "looked like" a kerchunk static json but is actually generating the references on the fly
This is definitely part of the long-term kerchunk vision - it would work particularly well when chunks map to some file naming scheme.
This is definitely part of the long-term kerchunk vision - it would work particularly well when chunks map to some file naming scheme.
To clarify, do you imagine including it as part of the kerchunk spec? Because it already works today (as in my example); you just have to document how to write such a class.
Reading through your code, @TomAugspurger . Do I understand that the main thing here is to use the index definition as an alternative to scanning the file? The "message" I'm pretty sure is the same unit of grib that my code was loading, but with the roundtrip to file. Handling the bytes in memory like this could be done for the existing grib2 module too, I assume.
Yes, I'm sure that an approach like this could be made to work with concurrent loading, perhaps separating the references part (storage) and the decoding (filter).
do you imagine including it as part of the kerchunk spec
It was in my mind to make this explicit, through documentation and/or examples. The mentions in the code and text somewhere of alternative storage formats for the references hints at it.
Of course, there is no simple way you could encode something like your custom dict into an Intake description (fo
would normally be a path string), something extra to think about.
Thanks for the thoughts on putting this logic in a Mapping vs. a filter. I initially had it as a mapping, but moved away from that since I wasn't sure about
- The appropriateness of generating a Zarr store that can only be opened with a specific mapping.
- How to compose this "special" mapping with other mappings, if that's even desirable
But filters aren't quite the right place for this either.
Do I understand that the main thing here is to use the index definition as an alternative to scanning the file?
That might be possible, but will take a bit more work. GRIB2 files apparently (can) contain many data variables, which maybe should be grouped somehow. This test file has ~4,000 variables, which cfgrib groups into ~14 xarray datasets. Right now I need to scan the full GRIB2 file to figure out which rows in the index file are included in which dataset. I don't yet know if the rows in the index file have enough information to make the same grouping as cfgrib, or if that metadata is only available after reading.
I do suspect that it's the same byte range as your code. I'd prefer to use the provided ranges from the index file rather than re-discovering it (if the index file is available, which it probably isn't always).
GRIB2 files apparently (can) contain many data variables, which maybe should be grouped somehow
In usage I have seen from @rsignell-usgs , it seems normal to provide a filter when opening, so you only get messages that can be formed into a consistent dataset. My file scanner does implement this, and it looks like the index contains the information needed.
I'd prefer to use the provided ranges from the index file rather than re-discovering it
Totally makes sense, yes.
initially had it as a mapping, but moved away from that
I think @rabernat 's example shows nicely that you can separate it out into a reference-getter class (dict) like and filter/decoder, and still use ReferenceFileSystem. Subtleties to iron out, of course.
I've updated cogrib
using with the recommendations from yesterday:
-
cogrib.make_references
now generates kerchunk index files, so it's an alternative to Kerchunk'sscan_grib
: https://github.com/fsspec/kerchunk/blob/f42b0c23aa0229cee6e5a394a860f0ad1149ace6/kerchunk/grib2.py#L85. The output is read with fsspec's reference filesystem, rather than yet another custom mapper. -
cogrib
implements a Zarr filter to turn the raw bytes in memory from the GRIB2 file into an ndarray (https://github.com/TomAugspurger/cogrib/blob/a56d209d9162b276a31b61c4ae448ac9cd129cc5/cogrib.py#L187-L204). This is an alternative to the filter in https://github.com/fsspec/kerchunk/blob/main/kerchunk/grib2.py#L162, which writes to disk first (minor benefit).
Some notes on performance:
- Reading the whole file from disk with
cfgrib
to determine the datasets is slow (6-7 minutes for this test file) - Generating the references is really fast (less than 500 ms total for all 14 datasets in this case)
- Reading the zarr metadata with xarray is really fast (less than 200 ms total for all 14 datasets)
- Read performance looks OK. It's about 200ms to read each chunk, which in this case is a 1.55 Mb 2-d array. Datasets will have some number of these chunks equal to the
number of data variables * number of ensemble members * number of levels
. Anywhere from 1 to 3600 chunks. When there's more than one chunk, we average about 32 MiBs/second (or 8 MiBs/second/core; that should scale roughly linearly with the number of cores, up until we saturate the bandwidth of each node).
So lots of overlap with what's in kerchunk today, but at least I understand GRIB2 a bit better now. I'll look into how we could reconcile the two approaches. Updating kerchunk's numcodecs filter should be easy. Harmonizing scan_grib
might be a bit harder since they have different logic in how datasets are extracted /combined from a GRIB2 file (cogrib
just uses whatever cfgrib
does). But first I want to try this out on some other ECMWF files and then some NOAA files to make sure it generalizes.