hydrotools
hydrotools copied to clipboard
Add client tool for NWM 2.1 Retrospective data
The NWM 2.1 Retrospective data has been released in zarr format on AWS. Add a client tool to retrieve these data in HydroTools canonical dataframes.
Example to get going:
import xarray as xr
import fsspec
def get_aws_data(start, end, features):
"""Get data from AWS"""
# Base URL
url = r'https://noaa-nwm-retrospective-2-1-zarr-pds.s3.amazonaws.com/chrtout.zarr'
# url = r'https://noaa-nwm-retrospective-2-1-zarr-pds.s3.amazonaws.com/precip.zarr'
# Get xarray.dataset
ds = xr.open_zarr(fsspec.get_mapper(url), consolidated=True)
# Extract time series data
ts = ds.streamflow.sel(time=slice(start, end), feature_id=features)
# Return DataFrame
return ts.to_dataframe().reset_index()
def main():
# Get pandas.DataFrame
df = get_aws_data(
start="2014-01-01 00:00",
end="2014-01-12 15:00",
features=[101, 179]
)
# Print
print(df)
if __name__ == "__main__":
main()
Output
time feature_id elevation gage_id latitude longitude order streamflow
0 2014-01-01 00:00:00 101 36.950001 b' ' 31.086876 -94.640541 3 0.60
1 2014-01-01 00:00:00 179 191.600006 b' ' 46.022163 -67.986412 1 0.01
2 2014-01-01 01:00:00 101 36.950001 b' ' 31.086876 -94.640541 3 0.60
3 2014-01-01 01:00:00 179 191.600006 b' ' 46.022163 -67.986412 1 0.01
4 2014-01-01 02:00:00 101 36.950001 b' ' 31.086876 -94.640541 3 0.59
.. ... ... ... ... ... ... ... ...
555 2014-01-12 13:00:00 179 191.600006 b' ' 46.022163 -67.986412 1 0.67
556 2014-01-12 14:00:00 101 36.950001 b' ' 31.086876 -94.640541 3 0.45
557 2014-01-12 14:00:00 179 191.600006 b' ' 46.022163 -67.986412 1 0.62
558 2014-01-12 15:00:00 101 36.950001 b' ' 31.086876 -94.640541 3 0.45
559 2014-01-12 15:00:00 179 191.600006 b' ' 46.022163 -67.986412 1 0.61
[560 rows x 8 columns]
Here's an updated example. Recent versions of xarray
and zarr
perform the fsspec
magic automatically.
import xarray as xr
def get_aws_data(start, end, features):
"""Get data from AWS"""
# Base URL
url = r's3://noaa-nwm-retrospective-2-1-zarr-pds/chrtout.zarr'
# url = r's3://noaa-nwm-retrospective-2-1-zarr-pds/precip.zarr'
# Get xarray.dataset
# Requires s3fs be installed
ds = xr.open_dataset(
url,
backend_kwargs={
"storage_options": {"anon": True},
"consolidated": True
},
engine="zarr"
)
# Extract time series data
ts = ds.streamflow.sel(time=slice(start, end), feature_id=features)
# Return DataFrame
return ts.to_dataframe().reset_index()
def main():
# Get pandas.DataFrame
df = get_aws_data(
start="2014-01-01 00:00",
end="2014-01-12 15:00",
features=[101, 179]
)
# Print
print(df)
if __name__ == "__main__":
main()
I experimented with alternative methods of data retrieval, mostly focused on trying to get dask
to automatically partition and control memory usage for large requests. However, I found the additional overhead introduced did not seem to scale well (at least for potential evaluation applications) on a LocalCluster
. If your goal is retrieving large continuous chunks of data, then dask
for retrieval may be the way to go. However, for random retrieval (say, observation locations spread across thousands of chunks) it seems the above xarray
methods may be the most efficient.
However, there may be some ways to structure our calls to enforce reasonable limits and yield a rechunked dataset that would lend itself to local processing in a dask
context. For this, our goal would be to retrieve DataFrame
s that are ~100MB (per dask
recommendations). Each of these dataframes could then be a single dask.dataframe.DataFrame
partition.
Goal: Retrieve random zarr data while reading each chunk as few times as possible.
Step 1 - Determine chunking of source data
Setting chunks="auto"
will return a Dataset
containing dask.array.array
instead of xarray.Dataarray
using the source data chunking. At this point, we're just doing this to determine the source chunking. Here we can see that the data are chunked by time and feature_id. The time chunk size is 336 and feature_id is 15000. Note the output below does not include warnings raised by xarray
due to the last chunk of data being different (the data are not chunked evenly).
ds = xr.open_dataset(
"s3://noaa-nwm-retrospective-2-1-zarr-pds/chrtout.zarr",
backend_kwargs={
"storage_options": {"anon": True},
"consolidated": True
},
engine="zarr",
chunks="auto"
)
print(ds)
<xarray.Dataset>
Dimensions: (feature_id: 2776738, time: 367439)
Coordinates:
elevation (feature_id) float32 dask.array<chunksize=(2776738,), meta=np.ndarray>
* feature_id (feature_id) int32 101 179 181 ... 1180001803 1180001804
gage_id (feature_id) |S15 dask.array<chunksize=(2776738,), meta=np.ndarray>
latitude (feature_id) float32 dask.array<chunksize=(2776738,), meta=np.ndarray>
longitude (feature_id) float32 dask.array<chunksize=(2776738,), meta=np.ndarray>
order (feature_id) int32 dask.array<chunksize=(2776738,), meta=np.ndarray>
* time (time) datetime64[ns] 1979-02-01T01:00:00 ... 2020-12-31T23:0...
Data variables:
crs |S1 ...
streamflow (time, feature_id) float64 dask.array<chunksize=(336, 15000), meta=np.ndarray>
velocity (time, feature_id) float64 dask.array<chunksize=(336, 15000), meta=np.ndarray>
Attributes:
TITLE: OUTPUT FROM WRF-Hydro v5.2.0-beta2
code_version: v5.2.0-beta2
featureType: timeSeries
model_configuration: retrospective
proj4: +proj=lcc +units=m +a=6370000.0 +b=6370000.0 +lat_1...
Step 2 - Determine chunk size as a DataFrame
Below we're just trying to figure out how big our DataFrame
s will be on a per chunk basis. Each chunk will produce a dataframe with 5,040,000 rows that is ~157 MB in RAM after optimizations. So to hit ~100 MB per dataframe, we'll want to generate dataframes with ~3,210,191 rows.
# Determine size of single chunk
fid_list = ds.feature_id.data[:15000]
tm_list = ds.time.data[:336]
# Get single chunk of streamflow variable
df = ds.streamflow.sel(
time=tm_list,
feature_id=fid_list
).to_dataframe().reset_index()
# Look at memory usage
print(df.info(memory_usage="deep"))
# Optimize memory usage
df["gage_id"] = df["gage_id"].astype("category")
df["streamflow"] = pd.to_numeric(df["streamflow"], downcast="float")
df["feature_id"] = pd.to_numeric(df["feature_id"], downcast="integer")
print(df.info(memory_usage="deep"))
BEFORE OPTIMIZATION
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5040000 entries, 0 to 5039999
Data columns (total 8 columns):
# Column Dtype
--- ------ -----
0 time datetime64[ns]
1 feature_id int64
2 elevation float32
3 gage_id object
4 latitude float32
5 longitude float32
6 order int32
7 streamflow float64
dtypes: datetime64[ns](1), float32(3), float64(1), int32(1), int64(1), object(1)
memory usage: 461.4 MB
None
AFTER OPTIMIZATION
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5040000 entries, 0 to 5039999
Data columns (total 8 columns):
# Column Dtype
--- ------ -----
0 time datetime64[ns]
1 feature_id int32
2 elevation float32
3 gage_id category
4 latitude float32
5 longitude float32
6 order int32
7 streamflow float32
dtypes: category(1), datetime64[ns](1), float32(4), int32(2)
memory usage: 158.6 MB
None
Step 3 - Make some assumptions
Let's say we want all 40 years of data from 8000 non-contiguous feature_id. In other words, there's probably no way around reading every chunk at least once. Let's assume our feature_ids of interest are evenly distributed across all 186 feature_id chunks. We can expect 43 of our interesting feature_ids to occur in each chunk on average. To hit our goal of ~3,210,191 rows per dataframe, we'll want to retrieve 74,656 times per retrieval. Rounding to the closest time chunk results in a time slice that is 74,592 hours wide. This will produce an optimized dataframe that is 94.8 MB in memory with 3,207,456 rows (this is lower because the feature_ids I used in this example were small). Retrieving all of our desired data will require 874 separate calls.
fid_list = ds.feature_id.data[:43]
tm_list = ds.time.data[:74592]
df = ds.streamflow.sel(
time=tm_list,
feature_id=fid_list
).to_dataframe().reset_index()
df["gage_id"] = df["gage_id"].astype("category")
df["streamflow"] = pd.to_numeric(df["streamflow"], downcast="float")
df["feature_id"] = pd.to_numeric(df["feature_id"], downcast="integer")
print(df.info(memory_usage="deep"))
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3207456 entries, 0 to 3207455
Data columns (total 8 columns):
# Column Dtype
--- ------ -----
0 time datetime64[ns]
1 feature_id int16
2 elevation float32
3 gage_id category
4 latitude float32
5 longitude float32
6 order int32
7 streamflow float32
dtypes: category(1), datetime64[ns](1), float32(4), int16(1), int32(1)
memory usage: 94.8 MB
None
Step 4 - Scaling it up
The previous assumptions were suitable for someone working on a laptop with limited cores and memory. However, we can do better with more resources. A natural inclination would be to use dask
to conduct the retrieval. However, I'm not convinced that will be efficient for this type of random retrieval. The big problem here is that we want relatively little data that's spread across 10's of thousands of chunks. With more resources, a potential alternative algorithm could use the methods above, but make larger requests that generate larger dataframes (say ~200 to ~400 MB). Using to_dask_dataframe
these larger dataframes can be repartitioned to ~100MB easily.
Step 5 - Validation
This is the most difficult step. There doesn't seem to be a good way to validate the data unless the data provider supplies checksums for every chunk as part of their data model. Furthermore, if a chunk fails to return, it's unclear how to make sure the information gets funneled up to the user. The xarray-fsspec-s3fs-dask-zarr system is complicated and as near as I can tell chunk failures get silenced by one of these packages. Further, because of the way missing data are automatically filled in, it's unclear whether missing data are real "missing data" or whether a chunk failed to return. The only way I can think of to validate is to pull the data at least twice and compare the output using DataFrame.eq
.
Here's an example implementation of the principles explained above.
import xarray as xr
import numpy as np
import pandas as pd
from pathlib import Path
# from dask.distributed import Client, LocalCluster
def retrieve_data(params):
# Extract parameters
ds = params[0]
feature_ids = params[1]
start = params[2]
end = params[3]
ofile = params[4]
# Retrieve data
# I use to_netcdf here because it writes in parallel
ds[["streamflow"]].sel(
time=slice(start, end),
feature_id=feature_ids
).to_netcdf(ofile)
def main():
# Open dataset
ds = xr.open_dataset(
"s3://noaa-nwm-retrospective-2-1-zarr-pds/chrtout.zarr",
backend_kwargs={
"storage_options": {"anon": True},
"consolidated": True
},
engine="zarr",
)
# Find features that correspond to a USGS streamflow gage
gages = (ds.gage_id.where(ds.gage_id != ''.rjust(15).encode(), drop=True))
# Get a list of feature IDs
fids = gages.feature_id.data
# Split up feature IDs into groups of 43
no_chunks = (fids.size // 43) + 1
fids_split = np.array_split(fids, no_chunks)
# Generate time slices
start = pd.Timestamp(ds.time.min().values)
end = pd.Timestamp(ds.time.max().values)
periods = int((end - start) / pd.Timedelta("74592H")) + 1
first_ts = pd.date_range(
start=start,
periods=periods,
freq="74592H"
)
last_ts = first_ts + pd.Timedelta("74591H")
# File directory
odir = Path("retro_data")
odir.mkdir(exist_ok=True)
# Generate parameters
input_parameters = []
count = 0
for fid_chunk in fids_split:
for ft, lt in zip(first_ts, last_ts):
ofile = odir / f"chunk_{count}.nc"
p = [ds, fid_chunk, ft, lt, ofile]
input_parameters.append(p)
count += 1
# # Retrieve data in "serial"
# NOTE This actually retrieves the data asynchronously by source chunk and
# writes the NetCDF files in parallel using all cores
for params in input_parameters:
retrieve_data(params)
# Retrieve data in parallell
# NOTE I don't think this will actually work and is probably unnecessary
# However, you could setup a Client(LocalCluster) to limit the number
# of processes and/or RAM used. Though the chunking scheme inherent to
# input_parameters is already conservative.
# cluster = LocalCluster(
# n_workers=4,
# processes=True,
# threads_per_worker=1
# )
# with Client(cluster) as client:
# client.map(retrieve_data, input_parameters)
# Close dataset
ds.close()
# Do some larger-than-memory computations
# NOTE I'm using dask to compute here, but you could also transform, optimize, shuffle,
# and write the data to parquet
df = xr.open_mfdataset(odir.glob("*.nc")).to_dask_dataframe()
computation = df.groupby("feature_id").max().compute()
print(computation)
if __name__ == "__main__":
main()
To make the "parallel" version work, I think you'd have to use futures
or delayed
objects. I think every process would require its own ds = xr.open_dataset
, which involves some overhead. This may be the best we can do on a single machine. Making this go faster would rely on retrieving bigger chunks.
Having said that, there is potential here to scale up to a real distributed cluster.
@jarq6c, I tried to track down information and couldn't find something to explain the difference between open_dataset
and open_zarr
... any light to shed on that?
Here's an example of open_zarr
opening the same set of files you refer to above. @sepehrkrz, maybe we can do a performance test...
'''
#install these libraries if they aren't already installed
!pip install zarr
!pip install xarray
!pip install s3fs
!pip install numpy
'''
# Import needed libraries
import xarray as xr
import numpy as np
import s3fs
from datetime import datetime, timedelta
# open the zarr store
url = "s3://noaa-nwm-retrospective-2-1-zarr-pds/chrtout.zarr"
fs = s3fs.S3FileSystem(anon=True)
store = xr.open_zarr(s3fs.S3Map(url, s3=fs))
# Function to get the time series for a specified reach id and and time range
# then write it out to a csv file.
def GetAndWriteTimeSeriesAtReach(reach_id, start_time_index, end_time_index):
flows = streamflow_array.where(feature_id_array==reach_id, drop=True)
df_flows = flows[start_time_index:end_time_index].to_dataframe()
df_flows.to_csv(f'flows_{reach_id}.csv')
# get an xarray array of the various values
time_array = store['time']
feature_id_array = store['feature_id']
streamflow_array = store['streamflow']
# Define the feature IDs to check for
feature_ids = [5781221, 5781223, 5781703]
# Specify the start and end times of interest
start_time = datetime(2015, 5, 23, 0, 0, 0)
end_time = datetime(2015, 6, 24, 0, 0, 0)
# Get the indices for the needed dates
zero_start_time = start_date = datetime(1979, 2, 1, 0, 0, 0)
start_time_index = int((start_time - zero_start_time).total_seconds() / 3600)
end_time_index = int((end_time - zero_start_time).total_seconds() / 3600)
for reach_id in feature_ids:
GetAndWriteTimeSeriesAtReach(reach_id, start_time_index, end_time_index)
@jameshalgren Sorry for the delayed response. In practice, I haven't found any difference between open_dataset
and open_zarr
, assuming engine="zarr"
is used with open_dataset
. Peaking at the source code in the xarray
repo, it does look like open_dataset
introduces a few additional layers and sets up a pythonic file system (via fsspec
) for you. Anecdotally, I found some posts on stack from over a year ago that suggested a performance difference between open_dataset
and open_zarr
, but offered no evidence.
Closing old issues. TEEHR is pretty good for retro data: https://github.com/RTIInternational/teehr