xpublish
xpublish copied to clipboard
Can xpublish serve Datasets dynamically?
Hi @jhamman, xpublish looks really neat.
Does it provide a way to serve data holdings dynamically so that you could potentially serve millions of files? This would allow users to navigate an end-point that would dynamically read and serve an xarray Dataset
on request (rather than in advance).
It is not really possible right now but it shouldn't be hard to support that.
xpublish.Rest
accepts any mapping as a collection of datasets to serve, so in theory you could provide a custom mapping object that opens a Dataset on request in __getitem__
.
Unfortunately, the collection of Datasets is currently converted to a dictionary when creating a new Rest
instance. Instead, we could add an internal mapping wrapper class in xpublish and move the Dataset collection normalization there. This would also allow returning a meaningful HTTP error in case something went wrong when trying to access a given Dataset.
Thanks @benbovy, this could be a useful feature some time in the future.
Edit: See @jr3cermak's response below on how to create and use a dataset provider plugin to override how xpublish loads datasets.
So this isn't super useful in and of itself right now as the source data is zarr, but I mocked up a dynamic subclass of xpublish.Rest
that works with Pangeo-Forge. full gist
By overriding the dataset accessor function, and preloading the dataset IDs in the xpublish.Rest.__init__
method, xpublish can lazily load a dataset when requested.
class DynamicRest(xpublish.Rest):
def __init__(self, routers=None, cache_kws=None, app_kws=None):
self._get_dataset_func = get_pangeo_forge_dataset
self._datasets = list(pangeo_forge_dataset_map().keys())
dataset_route_prefix = "/datasets/{dataset_id}"
self._app_routers = rest._set_app_routers(routers, dataset_route_prefix)
self._app = None
self._app_kws = {}
if app_kws is not None:
self._app_kws.update(app_kws)
self._cache = None
self._cache_kws = {"available_bytes": 1e6}
if cache_kws is not None:
self._cache_kws.update(cache_kws)
def pangeo_forge_datasets():
res = requests.get(recipe_runs_url)
return res.json()
def pangeo_forge_with_data():
datasets = pangeo_forge_datasets()
return [r for r in datasets if r["dataset_public_url"]]
def pangeo_forge_dataset_map():
datasets = pangeo_forge_with_data()
return {r["recipe_id"]: r["dataset_public_url"] for r in datasets}
def get_pangeo_forge_dataset(dataset_id: str) -> xr.Dataset:
dataset_map = pangeo_forge_dataset_map()
zarr_url = dataset_map[dataset_id]
mapper = fsspec.get_mapper(zarr_url)
ds = xr.open_zarr(mapper, consolidated=True)
return ds
It looks like if you also overrode ._init_app()
you could lazily load the dataset IDs too.
Thanks @abkfenris, that looks like it could be really useful. I am buried in other activities at the moment but hopefully I'll get a chance to come back to this. 👍
I've gone further down the dynamic xpublish rabbit hole, in this case exposing any gridded data from the awesome-erddap list: https://github.com/abkfenris/xpublish-erddap
Using plugins as described by @abkfenris in #155 and turning the internal cache off, you can dynamically serve a directory of files using this as an example (server.py):
#!/usr/bin/env python
import os, glob
import xarray as xr
import xpublish as xp
from xpublish import Plugin, hookimpl
from fastapi import APIRouter
class DirectoryDataset(Plugin):
name = "directory-dataset"
@hookimpl
def get_datasets(self):
files = glob.glob("*.nc")
files.sort()
return files
@hookimpl
def get_dataset(self, dataset_id: str):
if os.path.isfile(dataset_id):
return xr.open_dataset(dataset_id)
return None
collection = xp.Rest(
cache_kws=dict(available_bytes=0)
)
collection.register_plugin(DirectoryDataset())
# LOGGING
# https://stackoverflow.com/questions/60715275/fastapi-logging-to-file
collection.serve(log_config="log.ini")
Here is the client.py:
import os, sys
import xarray as xr
from fsspec.implementations.http import HTTPFileSystem
import fsspec
# Server url
base_url = 'http://0.0.0.0:9000'
# Obtain available datasets
fs = fsspec.filesystem('http')
print("Available datasets:")
datasets_url = os.path.join(base_url, 'datasets')
res = fs.cat(datasets_url)
files = eval(res.decode())
files.sort()
# Walk through available datasets
fs = HTTPFileSystem()
for dsname in files:
url = fs.get_mapper(os.path.join(base_url, 'datasets', dsname))
ds = xr.open_zarr(url)
print(' ', dsname, ds.dims)
ds.close()
NOTE: If you dynamically update and change datasets in place, don't use the cache. This will incur a performance penalty. But you do gain a very lightweight dynamic service.
The following was completed with the server running in another terminal and was not restarted between client runs or file operations.
Starting with two files:
$ python client.py
Available datasets:
ocean1.nc Frozen({'two': 2, 'eta_rho': 818, 'xi_rho': 344, 'bath': 8, 'eta_psi': 817, 'xi_psi': 343, 'eta_u': 818, 'xi_u': 343, 'eta_v': 817, 'xi_v': 344})
ocean2.nc Frozen({'nyp': 929, 'nxp': 721, 'ny': 928, 'nx': 720})
I will copy these files.
$ cp ocean1.nc ocean3.nc
$ cp ocean2.nc ocean4.nc
And now see:
$ python client.py
Available datasets:
ocean1.nc Frozen({'two': 2, 'eta_rho': 818, 'xi_rho': 344, 'bath': 8, 'eta_psi': 817, 'xi_psi': 343, 'eta_u': 818, 'xi_u': 343, 'eta_v': 817, 'xi_v': 344})
ocean2.nc Frozen({'nyp': 929, 'nxp': 721, 'ny': 928, 'nx': 720})
ocean3.nc Frozen({'two': 2, 'eta_rho': 818, 'xi_rho': 344, 'bath': 8, 'eta_psi': 817, 'xi_psi': 343, 'eta_u': 818, 'xi_u': 343, 'eta_v': 817, 'xi_v': 344})
ocean4.nc Frozen({'nyp': 929, 'nxp': 721, 'ny': 928, 'nx': 720})
Now I will copy ocean2.nc over ocean3.nc inplace.
$ cp ocean2.nc ocean3.nc
And we obtain the desired result:
$ python client.py
Available datasets:
ocean1.nc Frozen({'two': 2, 'eta_rho': 818, 'xi_rho': 344, 'bath': 8, 'eta_psi': 817, 'xi_psi': 343, 'eta_u': 818, 'xi_u': 343, 'eta_v': 817, 'xi_v': 344})
ocean2.nc Frozen({'nyp': 929, 'nxp': 721, 'ny': 928, 'nx': 720})
ocean3.nc Frozen({'nyp': 929, 'nxp': 721, 'ny': 928, 'nx': 720})
ocean4.nc Frozen({'nyp': 929, 'nxp': 721, 'ny': 928, 'nx': 720})
@agstephens
So I have some code that does this. Basically you can dynamically serve cataloged (STAC, Intake, or another if you write a plugin) Zarr + NetCDF datasets.
https://github.com/LimnoTech/Catalog-To-Xpublish
I may move the organization, but searching "Catalog-To-Xpublish" should find it. My approach was to mount an Xpublish server to different endpoints representing a catalog hierarchy. If you don't care about catalog hierarchy, look at my provider_plugin.py
for dynamic access.