xpublish
xpublish copied to clipboard
return /air/mean as zarr, what's the best strategy to implement the routes?
I really like the approach you implemented with:
from fastapi import APIRouter, Depends, HTTPException
from xpublish.dependencies import get_dataset
myrouter = APIRouter()
@myrouter.get("/{var_name}/mean")
def get_mean(var_name: str, dataset: xr.Dataset = Depends(get_dataset)):
if var_name not in dataset.variables:
raise HTTPException(
status_code=404, detail=f"Variable '{var_name}' not found in dataset"
)
return float(dataset[var_name].mean())
ds.rest(routers=[myrouter])
ds.rest.serve()
The example above returns a float. What I'd like to do is to implement API endpoints for a derived dataset (e.g. spatial subset) served as zarr, let's say:
/datasets/{dataset_id}/{variable}/processes/position:aggregate-time/.zmetadata /datasets/{dataset_id}/{variable}/processes/position:aggregate-time/zgroups /datasets/{dataset_id}/{variable}/processes/position:aggregate-time/zattrs /datasets/{dataset_id}/{variable}/processes/position:aggregate-time/{var}/{chunk}
The client would then do something like
curl -X 'GET' \
'http://0.0.0.0:9001/datasets/no2/tropospheric_no2_column_number_density/processes/position:aggregate-time/.zmetadata?location=2.12%2C48.75%2C2.52%2C48.99&function=mean&datetime=2018-05-01T00%3A00%3A00%2F2018-06-01T00%3A00%3A00' \
-H 'accept: application/json'
or
fs = HTTPFileSystem()
http_map = fs.get_mapper('http://0.0.0.0:9001/datasets/no2/tropospheric_no2_column_number_density/processes/position:aggregate-time/.zmetadata?location=2.12%2C48.75%2C2.52%2C48.99&function=mean&datetime=2018-05-01T00%3A00%3A00%2F2018-06-01T00%3A00%3A00')
What would be the best approach to implement this with xpublish? Any suggestion would be appreciated
Do you need to expose any aggregation parameter as API endpoint parameter?
If you don't need to, the simplest way is to compute the aggregated datasets (maybe lazily) before serving them with xpublish.
Otherwise, I think you could do something like this, although this feels a bit hacky and it applies to all endpoints of the application (not just zarr endpoints):
import xarray as xr
import xpublish
from xpublish.dependencies import get_dataset
datasets_to_serve = {...}
rest = xpublish.Rest(datasets_to_serve)
get_actual_dataset = rest.app.dependency_overrides[get_dataset]
def get_aggregated_dataset(dataset_id: str, dim: str = 'time'):
ds = get_actual_dataset(id)
# call aggregate function using `dim`
# (maybe use xpublish's cache to avoid computing the aggregated dataset each time)
aggregated = ...
return aggregated
rest.app.dependency_overrides[get_dataset] = get_aggregated_dataset
rest.serve()
@benbovy, I do need to expose the aggregation parameter as an API endpoint parameter so I'll look at what you propose and come back with feeedback. Thanks for your time and code snippet!
@fabricebrito - wondering if you have an update here or if we should close this issue?