staged-recipes icon indicating copy to clipboard operation
staged-recipes copied to clipboard

Proposed Recipes for Global Drifter Program

Open philippemiron opened this issue 3 years ago • 23 comments

Source Dataset

I'm trying to port a code that converts the hourly GDP dataset (~20'000 individual netCDF files) into a ragged array to a pangeo-forge recipe.

  • AOML website link.
  • The file format is netCDF.
  • One file per drifter.
  • Accessible through ftp
    • ftp://ftp.aoml.noaa.gov/phod/pub/lumpkin/hourly/v2.00/netcdf/drifter_101143.nc

Transformation / Alignment / Merging

The files contain a few variables and metadata that should in fact be stored as variables. I have a draft recipe that cleans up the file, parses the date, converts metadata to variables. The files have two dimensions ['traj', 'obs'], where each file (one ['traj']) contains a different number of observations ['obs'] (this ranges from ~1000-20000+). More precisely, scalar variables['traj'] are: type of buoy, dimensions, launch date, drogue loss date, etc., and vector variables['obs'] are: lon, lat, ve, vn, time, err, etc.

To create the ragged array, scalar variables should be concatenated together, and the same goes for the various vector variables. My two issues are:

  • is it possible to concatenate on multiple dimensions?
  • is it possible to set a non-uniform chunk size? Chunks would be set to the number of observations per trajectory, which is needed to merge the nth chunk and allow for efficient calculations per trajectory.

Cheers,

Output Dataset

Single netCDF archive, or a Zarr folder.

philippemiron avatar Feb 01 '22 20:02 philippemiron

Thanks for posting this Philippe! Pinging @selipot who I believe has NSF Earthcube funding to work on this topic!

  • is it possible to concatenate on multiple dimensions?

Not currently, but this is in the works (see https://github.com/pangeo-forge/pangeo-forge-recipes/issues/140)

  • is it possible to set a non-uniform chunk size?

On the target Zarr dataset? No, Zarr does not support this.


Given the ragged nature of the data you described, Xarray / Zarr may not be a good fit at all for the target format. Have you considered either

  • Awkward Array (cc @jpivarski) - what is the ideal on-disk format?
  • Using a tabular data model (e.g. Pandas dataframe) + Parquet storage?

rabernat avatar Feb 01 '22 20:02 rabernat

Thank you for pinging me. Yes, @philippemiron's work on this is funded through my CloudDrift EarthCube project :)

selipot avatar Feb 01 '22 20:02 selipot

Thanks for the suggestion, I will take a look at Awkward Array. I have not considered pandas, the data set is not that large ~17 GB, but I believe xarray would be easier to work with. I realized that ragged arrays are not really a good fit for Xarray but I have been testing using groupby() operation from flox by setting nonuniform Dask chunks per trajectory and hope to make operations per trajectory efficient.

philippemiron avatar Feb 01 '22 20:02 philippemiron

For working with Lagrangian data (which we do a lot in our lab), I have always preferred Pandas, due to the ability to group on any of the different attributes (float id, lat / lon, time, etc). 17 GB is not so bad if you use dask.dataframe + Parquet.

I would recommend the following approach:

  • Define 3 representative workflow tasks
  • Try to do each of them with the 3 different libraries (Xarray, Awkward, Pandas)
  • Assess on a few different metrics including
    • performance (raw speed)
    • how confusing / complicated the code is

rabernat avatar Feb 01 '22 20:02 rabernat

Thanks for the cc!

  • Awkward Array (cc @jpivarski) - what is the ideal on-disk format?

It's looking more and more like the ideal on-disk format is Parquet. pyarrow support for converting rich data types between Arrow and Parquet is getting better and better, and in pyarrow 6, Awkward 2, we carry over enough metadata to do lossless Awkward ↔ Arrow ↔ Parquet round-trips.

Also, we've been thinking/hoping to add a Zarr v3 extension (zarr-developers/zarr-specs#62), though this would require the Awkward library to interpret a Zarr dataset as ragged. An advantage of Parquet is that the raggedness concept is built into the file format—any other library reading it would interpret it correctly.

We're in the midst of developing a Dask collection for Awkward Arrays (https://github.com/ContinuumIO/dask-awkward/) and would be eager to help you with your use-case as a way to get feedback for our development. I'm on https://gitter.im/pangeo-data/Lobby and https://discourse.pangeo.io/ .

jpivarski avatar Feb 01 '22 21:02 jpivarski

Hi @rabernat,

We are still testing out file format and actually putting together a Notebook for the EarthCube annual meaning benchmarking typical workflow tasks with xarray, pandas, and awkward array. We are now wondering if it will be possible to host the dataset on the Pangeo Cloud. Currently, I have a set of preprocessing tools that:

  • replace the missing values by np.nan (not recognized because of errors in the NOAA files Alexander-Barth/NCDatasets.jl#166);
  • parse the date/time variable;
  • fix some data types;
  • retrieve some of the variables stored as attributes;
  • allocate and fill the np.array;
  • save to an xr.Dataset (I also have a variant to create a parquet archive).

With those tools, I can create the ragged array as follow:

from os.path import join, isfile
from preprocess import create_ragged_array
import urllib.request

# download the data
list_id = [101143, 101144, 101509, 101510, 101511]  # subset of the 17324 files

input_url = (
    'ftp://ftp.aoml.noaa.gov/phod/pub/lumpkin/hourly/v2.00/netcdf/'
)

folder =  'raw/'  # output folder
files = []
for i in list_id:
    file = f'drifter_{i}.nc'
    url = join(input_url, file)
    
    if not isfile(join(folder, file)):
        req = urllib.request.urlretrieve(url, join(folder, file))
        
    files.append(join(folder, file))

# ragged array
ds = create_ragged_array(files).to_xarray()
ds.to_netcdf('array.nc')

My guess/hope is that this could possibly be converted to a pangeo-forge recipe but to be honest I'm not totally sure how to proceed (and if it's possible) with the current model.

philippemiron avatar Mar 23 '22 21:03 philippemiron

@philippemiron - we would love to help support this use case. If the data can be stored as netCDF, then we can make it work here (with zarr).

However, I'm confused about what this ragged array looks like in xarray? My understanding is that xarray does not support ragged arrays. Can you share the xarray repr of ds as generated by your code?

rabernat avatar Mar 24 '22 15:03 rabernat

See here. I'm also working with @jpivarski on a parquet representation that would be handled by awkward array.

philippemiron avatar Mar 24 '22 15:03 philippemiron

Could you please just do print(ds) and copy-paste the results on this issue?

rabernat avatar Mar 24 '22 15:03 rabernat

<xarray.Dataset>
Dimensions:                (traj: 5, obs: 10345)
Coordinates:
    ID                     (traj) int64 ...
    longitude              (obs) float32 ...
    latitude               (obs) float32 ...
    time                   (obs) datetime64[ns] ...
    ids                    (obs) int64 ...
Dimensions without coordinates: traj, obs
Data variables: (12/54)
    rowsize                (traj) int64 ...
    location_type          (traj) bool ...
    WMO                    (traj) int32 ...
    expno                  (traj) int32 ...
    deploy_date            (traj) datetime64[ns] ...
    deploy_lon             (traj) float32 ...
    ...                     ...
    err_sst                (obs) float32 ...
    err_sst1               (obs) float32 ...
    err_sst2               (obs) float32 ...
    flg_sst                (obs) int8 ...
    flg_sst1               (obs) int8 ...
    flg_sst2               (obs) int8 ...
Attributes: (12/15)
    title:             Global Drifter Program hourly drifting buoy collection
    history:           Version 2.00.  Metadata from dirall.dat and deplog.dat
    Conventions:       CF-1.6
    date_created:      2022-03-24T11:31:24.444885
    publisher_name:    GDP Drifter DAC
    publisher_email:   [email protected]
    ...                ...
    metadata_link:     https://www.aoml.noaa.gov/phod/dac/dirall.html
    contributor_name:  NOAA Global Drifter Program
    contributor_role:  Data Acquisition Center
    institution:       NOAA Atlantic Oceanographic and Meteorological Laboratory
    acknowledgement:   Elipot et al. (2022) to be submitted. Elipot et al. (2...
    summary:           Global Drifter Program hourly data

philippemiron avatar Mar 24 '22 15:03 philippemiron

I am very interested in how this gets solved.

I just wanted to raise the question of whether this is the best/good format to save the data in? It looks like instead of using a ragged array, the dataset is stored as a single vector (obs). This is the same approach that Argopy (https://argopy.readthedocs.io/en/latest/data_manipulation.html) took with its point data set (@gmaze). When Argopy goes to their "profile" dataset, they basically fill the dataset with a lot of NaNs. A single 'obs' vector is definitely the simplest and most intuitive format, and the separation into different drifters will have to be done by the analyst in code or by going to a "trajectory" dataset (like "profile"). Does the parquet representation handled by awkward array take care of that in a smarter way? and if so, then maybe Argopy can also benefit.

dhruvbalwada avatar Mar 24 '22 15:03 dhruvbalwada

Hi @dhruvbalwada,

The variables are indeed each stored into a 1-d ragged array of size ['obs']. The separation can be easily done using the rowsize['traj'] variable that contains the number of ['obs'] per trajectory. The main advantage of using awkward array is that the indexing is already taken care of by the library, e.g. sst[400] returns the sea surface temperature along trajectory #401. I believe it is worth investigating as it can make the implementation of operations per trajectory simpler.

philippemiron avatar Mar 24 '22 15:03 philippemiron

Oh wow, that sounds amazing! Can't wait to play with this. Maybe Argopy (@gmaze) would also like to move to this format in future releases, learning from the experience you are having here.

I am sure the pros will have some better comments, but the way the xarray is formatted it seems like it should be little trouble getting it into Pangeo Forge. One question might be, how does one go about chunking this dataset? If there are some better approaches than just chopping it arbitrarily. Have you tried following the tutorial to see if you can create a recipe out of this, it seems like most of the elements are already there in the code you used to create the ragged array. Is there a specific step in making the recipe where you are getting stuck?

dhruvbalwada avatar Mar 24 '22 16:03 dhruvbalwada

To perform operations per trajectory, you can chunk it like this:

# align chunks of the dask array with the trajectories
chunk_settings = {'obs': tuple(rowsize.tolist())}
ds = xr.open_dataset(path_gdp, chunks=chunk_settings)

and then use map_blocks() and apply_ufunc().

philippemiron avatar Mar 24 '22 16:03 philippemiron

After examining the code and the ouput dataset, I believe it should be possible to produce this dataset using a standard Pangeo Forge XarrayZarrRecipe. You are basically just concatenating all of the netcdf files along the obs dimension, as we usually do with time dimensions. The only complication would be how to handle the traj dimension, which has a different (smaller) shape.

We need to play around with this a bit, but I don't see any fundamental blockers here.

rabernat avatar Mar 24 '22 16:03 rabernat

I'm not 100% certain of what is the issue. I managed to create the XarrayZarrRecipe:

recipe = XarrayZarrRecipe(pattern,
                          target_chunks={'obs': tuple(obs_per_file)},
                          process_chunk=preprocess_gdp,
                          xarray_open_kwargs={'decode_times':False})
recipe

XarrayZarrRecipe(file_pattern=<FilePattern {'obs': 5}>, inputs_per_chunk=1, target_chunks={'obs': (417, 2005, 1970, 1307, 4646)}, target=None, input_cache=None, metadata_cache=None, cache_inputs=True, copy_input_to_local_file=False, consolidate_zarr=True, consolidate_dimension_coordinates=True, xarray_open_kwargs={'decode_times': False}, xarray_concat_kwargs={}, delete_input_encoding=True, process_input=None, process_chunk=<function preprocess_gdp at 0x13f0a1750>, lock_timeout=None, subset_inputs={}, open_input_with_fsspec_reference=False)

But then when trying to retrieve one chunk, I get the following error:

with recipe.open_chunk(all_chunks[0]) as ds:
    display(ds)

ValueError: Chunks do not add up to shape. Got chunks=((417, 2005, 1970, 1307, 4646),), shape=(417,)

I will give it a second try over the weekend and report back on Monday.

philippemiron avatar Mar 24 '22 16:03 philippemiron

I will try to spend some time playing around with it.

rabernat avatar Mar 24 '22 16:03 rabernat

A single 'obs' vector is definitely the simplest and most intuitive format, and the separation into different drifters will have to be done by the analyst in code or by going to a "trajectory" dataset (like "profile"). Does the parquet representation handled by awkward array take care of that in a smarter way? and if so, then maybe Argopy can also benefit.

Parquet does that in a smarter way: the construction of a ragged array from rowsize and obs is built into the Parquet format. It knows about ragged arrays, but also nested records, missing data, heterogenous unions, etc.

>>> import awkward._v2 as ak
>>> dataset = ak.from_parquet("global-drifter/global-drifter-0000.parquet", "obs.l*itude")
>>> dataset.show(limit_rows=6)
{obs: [{longitude: 119, latitude: 21.2}, ..., {longitude: 115, ...}]},
{obs: [{longitude: -67.3, latitude: 41.9}, ..., {longitude: -52.6, ...}]},
{obs: [{longitude: -50, latitude: -32.5}, ..., {longitude: -51.8, ...}]},
...,
{obs: [{longitude: 177, latitude: 56.1}, ..., {longitude: 176, ...}]},
{obs: [{longitude: -109, latitude: -55.3}, ..., {longitude: -96.4, ...}]}]
>>> print(dataset.type)
907 * {obs: var * ?{longitude: float32, latitude: float32}}
>>> ak.num(dataset)
<Array [{obs: 1488}, {obs: 4136}, ..., {obs: 44962}] type='907 * {obs: int64}'>

But since this structure is relatively simple, ragged arrays with only one level of depth, distributing them with the existing NetCDF infrastructure and building that structure on demand is pretty reasonable:

>>> import h5py
>>> import numpy as np
>>> file = h5py.File("gdp_v2.00.nc")   # a different data sample
>>> content = ak.zip({"longitude": np.asarray(file["longitude"]), "latitude": np.asarray(file["latitude"])})
>>> dataset = ak.unflatten(content, np.asarray(file["rowsize"]))
>>> dataset.show(limit_rows=6)
[{longitude: -17.7, latitude: 14.7}, {...}, ..., {longitude: -16.9, ...}],
[{longitude: -17.3, latitude: 14.5}, {...}, ..., {longitude: -17.3, ...}],
[{longitude: 136, latitude: 10}, {...}, ..., {longitude: 125, latitude: 13.7}],
...,
[{longitude: -25.7, latitude: 65}, {...}, ..., {...}, {longitude: -30.2, ...}],
[{longitude: -24.9, latitude: 64.8}, {...}, ..., {longitude: -30.4, ...}]]
>>> print(dataset.type)
17324 * var * {longitude: float32, latitude: float32}
>>> ak.num(dataset)
<Array [417, 2005, 1970, 1307, ..., 4572, 17097, 3127] type='17324 * int64'>

(The last numbers are the lengths of the trajectories.) Although Parquet does this stuff in general, this particular dataset isn't complex enough to really need it.

Actually, I'd like to get involved in this for Argopy, for exactly the same reasons that it applies to @philippemiron's project. I did some early tests converting Argo's NetCDF files into Parquet, and the Parquet versions were 20× smaller, as well as faster to load. But even there, Parquet isn't strictly needed: the same rowsize + obs trick could be applied, and I think the gains would be similar. (By decoupling a physically meaningful variable, the sequence length, from file boundaries, we reduce overhead. But the rowsize + obs trick does the same thing.)

For this particular project, the main thing that I think Awkward + Dask provides over xarray + Dask is that when you

chunk_settings = {'obs': tuple(rowsize.tolist())}
ds = xr.open_dataset(path_gdp, chunks=chunk_settings)

the chunking size for parallelization becomes tied to the physically meaningful variable, "trajectory length." Datasets with a large number of small trajectories would have a per-task overhead that you can't get rid of. Consider that the rowsize is a potentially large array; converting it to a Python tuple might not be feasible, let alone creating that many Dask tasks.

With Awkward + Dask, a single task could be run on dataset[100:200], for example, which would have 100× less per-task overhead because it applies to 100 trajectories. How large that slice is would be determined by the number of available processors, memory constraints, and maybe networking if distributed.

Meanwhile, however, I need to learn the "+ Dask" part of this to actually develop some examples. @philippemiron and I have also considered some examples with Numba, and that's a third part to consider. @douglasdavis tried an example of Awkward + Dask + Numba this morning and it worked, though it evaluated the first partition locally to determine the output type and I think we can streamline that.

jpivarski avatar Mar 24 '22 19:03 jpivarski

I will try to spend some time playing around with it.

I've modified the preprocess function to take a xarray.Dataset as a parameter so we can pass it directly to process_chunk of XarrayZarrRecipe, see this gist. Here are the first few steps of the recipe to save you some time.

input_url_pattern = (
    "ftp://ftp.aoml.noaa.gov/phod/pub/lumpkin/hourly/"
    "v2.00/netcdf/drifter_{id}.nc"
)
list_id = [101143, 101144, 101509, 101510, 101511]
obs_per_file = [417, 2005, 1970, 1307, 4646]
input_urls = [
    input_url_pattern.format(id=id)
    for id in list_id
]
print(f"Found {len(input_urls)} files!")
input_urls
from preprocess import preprocess_gdp

recipe = XarrayZarrRecipe(pattern,
                          target_chunks={'obs': tuple(obs_per_file)},
                          process_chunk=preprocess_gdp,
                          xarray_open_kwargs={'decode_times':False})
recipe

Cheers,

philippemiron avatar Mar 24 '22 19:03 philippemiron

I just got an update from @selipot about this

We have derived a good format for one of the datasets of the NOAA GDP (the hourly dataset, now with SST). Now, as we planned, could we get this dataset up on the Pangeo catalog?

The dataset is not big in itself (17GB as NetCDF or ~6.2 GB as parquet) but I think its presence in a cloud catalog will be beneficial as it lays next to other Eulerian big datasets. If you agree, how can we make this happen? The initial dataset is available right now from NOAA NCEI as a single NetCDF file (see https://github.com/Cloud-Drift/gdp-get-started), but we have a better version we would like to share with Pangeo.

I am copying Philippe Miron to this email since he led the efforts that made this happened. By the way, we just released officially the first version of our clouddrift library (https://github.com/Cloud-Drift/clouddrift) which contains the tools and recipe that were used to create this dataset.

Sounds like we are ready to move ahead with creating a recipe. The dataset is just a single netCDF file at

https://www.nodc.noaa.gov/archive/arc0199/0248584/1.1/data/0-data/gdp_v2.00.nc

rabernat avatar Sep 23 '22 11:09 rabernat

@selipot - I browsed around the website, but I couldn't find any information about the GDP data license. Without a clear license, we may be in violation of copyright law if we host the data. Can you say any more about the license under which GDP is distributed?

rabernat avatar Sep 23 '22 11:09 rabernat

Working on finding this out with with the GDP. I suspect it would be what the license is for NOAA data which is unknown to me. Again reaching out to NOAA.

selipot avatar Sep 23 '22 15:09 selipot

From NOAA and NCEI: All environmental data managed/archived at NCEI is publicly available without restriction. NCEI doesn't currently employ a data usage license.

selipot avatar Sep 23 '22 17:09 selipot