xpublish icon indicating copy to clipboard operation
xpublish copied to clipboard

Can xpublish serve Datasets dynamically?

Open agstephens opened this issue 3 years ago • 7 comments

Hi @jhamman, xpublish looks really neat.

Does it provide a way to serve data holdings dynamically so that you could potentially serve millions of files? This would allow users to navigate an end-point that would dynamically read and serve an xarray Dataset on request (rather than in advance).

agstephens avatar May 07 '21 09:05 agstephens

It is not really possible right now but it shouldn't be hard to support that.

xpublish.Rest accepts any mapping as a collection of datasets to serve, so in theory you could provide a custom mapping object that opens a Dataset on request in __getitem__.

Unfortunately, the collection of Datasets is currently converted to a dictionary when creating a new Rest instance. Instead, we could add an internal mapping wrapper class in xpublish and move the Dataset collection normalization there. This would also allow returning a meaningful HTTP error in case something went wrong when trying to access a given Dataset.

benbovy avatar May 11 '21 12:05 benbovy

Thanks @benbovy, this could be a useful feature some time in the future.

agstephens avatar May 11 '21 14:05 agstephens

Edit: See @jr3cermak's response below on how to create and use a dataset provider plugin to override how xpublish loads datasets.

So this isn't super useful in and of itself right now as the source data is zarr, but I mocked up a dynamic subclass of xpublish.Rest that works with Pangeo-Forge. full gist

By overriding the dataset accessor function, and preloading the dataset IDs in the xpublish.Rest.__init__ method, xpublish can lazily load a dataset when requested.

class DynamicRest(xpublish.Rest):
    def __init__(self, routers=None, cache_kws=None, app_kws=None):
        self._get_dataset_func = get_pangeo_forge_dataset
        self._datasets = list(pangeo_forge_dataset_map().keys())
        dataset_route_prefix = "/datasets/{dataset_id}"

        self._app_routers = rest._set_app_routers(routers, dataset_route_prefix)

        self._app = None
        self._app_kws = {}
        if app_kws is not None:
            self._app_kws.update(app_kws)

        self._cache = None
        self._cache_kws = {"available_bytes": 1e6}
        if cache_kws is not None:
            self._cache_kws.update(cache_kws)

def pangeo_forge_datasets():
    res = requests.get(recipe_runs_url)
    return res.json()

def pangeo_forge_with_data():
    datasets = pangeo_forge_datasets()
    return [r for r in datasets if r["dataset_public_url"]]

def pangeo_forge_dataset_map():
    datasets = pangeo_forge_with_data()
    return {r["recipe_id"]: r["dataset_public_url"] for r in datasets}

def get_pangeo_forge_dataset(dataset_id: str) -> xr.Dataset:
    dataset_map = pangeo_forge_dataset_map()
    zarr_url = dataset_map[dataset_id]

    mapper = fsspec.get_mapper(zarr_url)
    ds = xr.open_zarr(mapper, consolidated=True)
    return ds

It looks like if you also overrode ._init_app() you could lazily load the dataset IDs too.

abkfenris avatar Apr 26 '22 22:04 abkfenris

Thanks @abkfenris, that looks like it could be really useful. I am buried in other activities at the moment but hopefully I'll get a chance to come back to this. 👍

agstephens avatar Jun 18 '22 10:06 agstephens

I've gone further down the dynamic xpublish rabbit hole, in this case exposing any gridded data from the awesome-erddap list: https://github.com/abkfenris/xpublish-erddap

abkfenris avatar Jun 19 '22 17:06 abkfenris

Using plugins as described by @abkfenris in #155 and turning the internal cache off, you can dynamically serve a directory of files using this as an example (server.py):

#!/usr/bin/env python

import os, glob
import xarray as xr
import xpublish as xp

from xpublish import Plugin, hookimpl
from fastapi import APIRouter


class DirectoryDataset(Plugin):
    name = "directory-dataset"

    @hookimpl
    def get_datasets(self):
        files = glob.glob("*.nc")
        files.sort()
        return files

    @hookimpl
    def get_dataset(self, dataset_id: str):
        if os.path.isfile(dataset_id):
            return xr.open_dataset(dataset_id)
        return None


collection = xp.Rest(
    cache_kws=dict(available_bytes=0)
)
collection.register_plugin(DirectoryDataset())

# LOGGING
# https://stackoverflow.com/questions/60715275/fastapi-logging-to-file
collection.serve(log_config="log.ini")

Here is the client.py:

import os, sys
import xarray as xr
from fsspec.implementations.http import HTTPFileSystem
import fsspec

# Server url
base_url = 'http://0.0.0.0:9000'

# Obtain available datasets
fs = fsspec.filesystem('http')
print("Available datasets:")

datasets_url = os.path.join(base_url, 'datasets')

res = fs.cat(datasets_url)
files = eval(res.decode())
files.sort()

# Walk through available datasets
fs = HTTPFileSystem()

for dsname in files:

    url = fs.get_mapper(os.path.join(base_url, 'datasets', dsname))

    ds = xr.open_zarr(url)

    print('  ', dsname, ds.dims)

    ds.close()

NOTE: If you dynamically update and change datasets in place, don't use the cache. This will incur a performance penalty. But you do gain a very lightweight dynamic service.

The following was completed with the server running in another terminal and was not restarted between client runs or file operations.

Starting with two files:

$ python client.py 
Available datasets:
   ocean1.nc Frozen({'two': 2, 'eta_rho': 818, 'xi_rho': 344, 'bath': 8, 'eta_psi': 817, 'xi_psi': 343, 'eta_u': 818, 'xi_u': 343, 'eta_v': 817, 'xi_v': 344})
   ocean2.nc Frozen({'nyp': 929, 'nxp': 721, 'ny': 928, 'nx': 720})

I will copy these files.

$ cp ocean1.nc ocean3.nc
$ cp ocean2.nc ocean4.nc

And now see:

$ python client.py 
Available datasets:
   ocean1.nc Frozen({'two': 2, 'eta_rho': 818, 'xi_rho': 344, 'bath': 8, 'eta_psi': 817, 'xi_psi': 343, 'eta_u': 818, 'xi_u': 343, 'eta_v': 817, 'xi_v': 344})
   ocean2.nc Frozen({'nyp': 929, 'nxp': 721, 'ny': 928, 'nx': 720})
   ocean3.nc Frozen({'two': 2, 'eta_rho': 818, 'xi_rho': 344, 'bath': 8, 'eta_psi': 817, 'xi_psi': 343, 'eta_u': 818, 'xi_u': 343, 'eta_v': 817, 'xi_v': 344})
   ocean4.nc Frozen({'nyp': 929, 'nxp': 721, 'ny': 928, 'nx': 720})

Now I will copy ocean2.nc over ocean3.nc inplace.

$ cp ocean2.nc ocean3.nc

And we obtain the desired result:

$ python client.py 
Available datasets:
   ocean1.nc Frozen({'two': 2, 'eta_rho': 818, 'xi_rho': 344, 'bath': 8, 'eta_psi': 817, 'xi_psi': 343, 'eta_u': 818, 'xi_u': 343, 'eta_v': 817, 'xi_v': 344})
   ocean2.nc Frozen({'nyp': 929, 'nxp': 721, 'ny': 928, 'nx': 720})
   ocean3.nc Frozen({'nyp': 929, 'nxp': 721, 'ny': 928, 'nx': 720})
   ocean4.nc Frozen({'nyp': 929, 'nxp': 721, 'ny': 928, 'nx': 720})

jr3cermak avatar Mar 08 '23 07:03 jr3cermak

@agstephens

So I have some code that does this. Basically you can dynamically serve cataloged (STAC, Intake, or another if you write a plugin) Zarr + NetCDF datasets.

https://github.com/LimnoTech/Catalog-To-Xpublish

I may move the organization, but searching "Catalog-To-Xpublish" should find it. My approach was to mount an Xpublish server to different endpoints representing a catalog hierarchy. If you don't care about catalog hierarchy, look at my provider_plugin.py for dynamic access.

xaviernogueira avatar Jun 23 '23 18:06 xaviernogueira