xarray
xarray copied to clipboard
Opening zarr dataset with poor connection leads to NaN chunks
Problem
I am using xarray to open zarr datasets located in an s3 bucket. However, it can happen that the results doesn't retrieve all the chunks and we have NaNs instead. It is usually linked with low bandwith internet connection and asking for a lot of chunks.
More details
In our case (see code below), we started tracking the http calls to understand a bit better the problem (with http tracking software). 3 cases are possible as for the response when getting a chunk:
- 200: we get the chunk with the data
- 403: missing data, this is normal as I am dealing with ocean data so the chunks associated with the continent don't exists
- no response: there isn't even a response so the get request "fails" and we don't have the data.
The latter is a big problem as we have randomly empty chunks! as a user it is also very annoying to detect.
We also noticed that when using xarray.open_dataset
the calls seems to be done all at the same time! Which increases the probability of NaN chunks. That's why we tried using xarray.open_mfdataset
since each worker calls the get request ie the chunks, one by one.
Questions
- Why the
xarray.open_dataset
sends all the requests concurrently? Is it possible to control the number of requests and do some kind of rolling batch gather? - Is there a way to raise an exception when there are no response from the server? So that at least, as users, we don't have to manually check the data.
- Any idea to solve this problem?
- Maybe this is linked to zarr library?
To reproduce
This bug is difficult to reproduce. The only way I managed to reproduce it is with a computer connected to a phone that is connected to the 3G. With this setup it happens all the time though. With a good connection and on my computer it never happens. We have had several reports of this problem otherwise.
See the two scripts: one with open_dataset
import xarray as xr
import matplotlib.pyplot as plt
import time
import sys
import logging
logging.basicConfig(
stream=sys.stdout,
format="%(asctime)s | %(name)14s | %(levelname)7s | %(message)s",
datefmt="%Y-%m-%dT%H:%M:%S",
encoding="utf-8",
level=logging.ERROR,
)
logging.getLogger("timeloop").setLevel(logging.DEBUG)
logging.getLogger("urllib3").setLevel(logging.DEBUG)
logging.getLogger("botocore").setLevel(logging.DEBUG)
logging.getLogger("s3fs").setLevel(logging.DEBUG)
logging.getLogger("fsspec").setLevel(logging.DEBUG)
logging.getLogger("asyncio").setLevel(logging.DEBUG)
# logging.getLogger("numba").setLevel(logging.ERROR)
logging.getLogger("s3transfer").setLevel(logging.DEBUG)
start_time = time.time()
print("Starting...")
data = xr.open_dataset("https://s3.waw3-1.cloudferro.com/mdl-arco-geo-012/arco/GLOBAL_ANALYSISFORECAST_PHY_001_024/cmems_mod_glo_phy-thetao_anfc_0.083deg_P1D-m_202211/geoChunked.zarr",
engine = "zarr")
print("Dataset opened...")
bla = data.thetao.sel(longitude = slice(-170.037309004901026,-70.037309004901026),
latitude=slice(-80.27257431850789,-40.27257431850789),
time=slice("2023-03-20T00:00:00","2023-03-20T00:00:00")).sel(elevation =0,
method="nearest")
print("Plotting... ")
map = bla.isel(time=0).plot()
#map = data.isel(time=0).plot()
print("Saving image...")
plt.savefig("./bla_fast.png")
print("Total processing time:", (time.time() - start_time))
and the other one with open_mfdataset
:
import xarray as xr
import matplotlib.pyplot as plt
import time
import sys
import dask
import logging
logging.basicConfig(
stream=sys.stdout,
format="%(asctime)s | %(name)14s | %(levelname)7s | %(message)s",
datefmt="%Y-%m-%dT%H:%M:%S",
encoding="utf-8",
level=logging.ERROR,
)
logging.getLogger("timeloop").setLevel(logging.DEBUG)
logging.getLogger("urllib3").setLevel(logging.DEBUG)
logging.getLogger("botocore").setLevel(logging.DEBUG)
logging.getLogger("s3fs").setLevel(logging.DEBUG)
logging.getLogger("fsspec").setLevel(logging.DEBUG)
logging.getLogger("asyncio").setLevel(logging.DEBUG)
# logging.getLogger("numba").setLevel(logging.ERROR)
logging.getLogger("s3transfer").setLevel(logging.DEBUG)
start_time = time.time()
with dask.config.set(num_workers=2):
print("Starting...")
data = xr.open_mfdataset(["https://s3.waw3-1.cloudferro.com/mdl-arco-geo-012/arco/GLOBAL_ANALYSISFORECAST_PHY_001_024/cmems_mod_glo_phy-thetao_anfc_0.083deg_P1D-m_202211/geoChunked.zarr"],
engine = "zarr")#.thetao.sel(longitude = slice(-170.037309004901026,-70.037309004901026),
#latitude=slice(-80.27257431850789,-40.27257431850789),
#time=slice("2023-03-20T00:00:00","2023-03-20T00:00:00")).sel(elevation =0, #method="nearest")
print("Dataset opened...")
bla = data.thetao.sel(longitude = slice(-170.037309004901026,-70.037309004901026),
latitude=slice(-80.27257431850789,-40.27257431850789),
time=slice("2023-03-20T00:00:00","2023-03-20T00:00:00")).sel(elevation =0,
method="nearest")
print("Plotting... ")
map = bla.isel(time=0).plot()
#map = data.isel(time=0).plot()
print("Saving image...")
plt.savefig("./bla_long.png")
print("Total processing time:", (time.time() - start_time))
Expected result
or failed run
Obtained result
Thanks for opening your first issue here at xarray! Be sure to follow the issue template! If you have an idea for a solution, we would really welcome a Pull Request with proposed changes. See the Contributing Guide for more. It may take us a while to respond here, but we really value your contribution. Contributors like you help make xarray better. Thank you!
I suspect fsspec
should raise an error here, but isn't. cc @martindurant
Indeed, with the same set ups and only doing the fsspec calls consurrently leads to some unexpected error: empty response
, Server disconnected
and [Errno 54] Connection reset by peer
(I can provide some example snippet if needed)
And @dcherian they DO raise when I leave the default fsspec default behavior (see https://filesystem-spec.readthedocs.io/en/latest/api.html?highlight=.cat#fsspec.spec.AbstractFileSystem.cat) so I actually think that it might not be an upstream issue.
So I am wondering: do you know where in the process there is an "omit" and we bypass all errors? Is it in xarray? In zarr? In which place?
Thanks 😄
So I am wondering: do you know where in the process there is an "omit" and we bypass all errors?
It is not an omit, but errors can be converted into FileNotFoundError either in fsspec's FSMap, or in zarr's FSStore (these two should do the same thing and are based on each other). zarr assumes NotFound means a file is really missing, and that the data there is all fill_value - this is the design.
In some situations it can be hard to tell the difference between something that isn't there and something that is, but won't load. This is one argument for having an extenral manifest of which files are expected - but then you need to keep this manifest in sync with reality.
I agree that it can be hard to tell the difference between something that isn't there and something that is. Though I am surprised that some errors are not propagated at all.
We made some specific zarr store to be able to retry those errors (and raise them if needed). Here is a little part of the store (I can share the rest with you).
def __contains__(self, key):
full_key = f"{self._root_path}/{key}"
try:
self.client.head_object(Bucket=self._bucket, Key=full_key)
return True
except botocore.exceptions.ClientError as e:
if "404" in str(e) or "403" in str(e):
return False
raise
You can see that we are only catching the 404 and 403 errors which correspond missing chunk. I was expecting xarray to work in the same fashion.
but errors can be converted into FileNotFoundError either in fsspec's FSMap, or in zarr's FSStore
We should fix this. Connection errors should be distinguishable from a true FileNotFound error though, especially since missing chunk files are valid for Zarr.
cc @jhamman
It is configurable: https://github.com/zarr-developers/zarr-python/blob/main/zarr/storage.py#L1354 (I don't know why this is broader than https://github.com/fsspec/filesystem_spec/blob/master/fsspec/mapping.py#L46 ).
Can it be configured through xarray (through open_dataset
for example) directly then?
I guess it would be interesting to have the a retry policy also
Yes, you should be able to put this into the storage_options within the backend_kwargs
I doesn't have any effect unfortunately, as you see
@martindurant as renaud mentions the storage_option.exceptions doesnt seem to be passed to the ZarrStore. doing something like backen_kwargs={"exceptions": KeyError}
throws an ArgumentError.
I am looking at https://github.com/pydata/xarray/blob/main/xarray/backends/zarr.py#L995
and can't really see how the storage_options or backend_kwargs would be passed to the Zarrstore?
We may need an xarray insider here. I don't know the connection between Zarrstore and zarr's FSStore - too many layers! I would focus on the main open_dataset function (not open_zarr), and try with an exception you specifically don't expect like ZeroDivisionError, as a test.
backen_kwargs={"exceptions": KeyError}
Just to be sure, there is a typo here, should be "backend_kwargs"
Yes, I think I just typed it wrong in the comment. FYI: The complete error message is quite specific
TypeError: ZarrBackendEntrypoint.open_dataset() got an unexpected keyword argument 'exceptions'
I am gonna see whether I can follow the nesting.
Ok. So this error is raised in backends/api.py#L572 a few lines above backend_kwargs
are unpacked to kwargs that are provided as keyword arguments (**kwargs
) to the open_dataset
function of the ZarrBackendEntrypoint
. That function has no exception
argument hence the error.
However, the storage_options
seem to be passed down to ZarrStore.open_group
(backends/zarr.py#L1029) and further to zarr.open_group
(e.g. backends/zarr.py#L471) it goes through a few functions and now in the zarr module should be passed to the FSStore you mentioned earlier (in my version its in zarr/storage.py#L170) .
Are you saying you have a working invocation now? This is a good reason to keep data-load definitions in a catalog, since working these things out every time is tedious.
Yes, using storage_options=dict(exceptions=[])
in open_dataset successfully passes an empty list of exceptions down to the FSStore. But if I throttle my connection I still end up without errors but missing chunks
I am also not sure I understand the difference between these two:
exceptions : list of Exception subclasses When accessing data, any of these exceptions will be treated as a missing key
missing_exceptions : sequence of Exceptions, optional Exceptions classes to associate with missing files. Passed to `fsspec.mapping.FSMap` constructor.
All I can see is that it should raise a KeyError instead of the exception provided by exceptions
but I cant really see where it ignores the timeout/replaces it with NaNs
It is the zarr internals that treats KeyError as NaN (or other missing value). I'm no longer sure why zarr has a mapper on top of fsspec's mapper, but you can safely set missing_exceptions to empty, and the outer exceptions to probably just (KeyError,).
Yeah I think that seems to be the crux. No matter what we input as exceptions
it doesn't raise an error but looses the chunks regardless.
Do we have an MCVE available?
Closing to limit the number of open issues without MCVEs, please feel free to reopen with an MCVE