pangeo-forge-recipes icon indicating copy to clipboard operation
pangeo-forge-recipes copied to clipboard

Hanging during `store_chunk` loop

Open cisaacstern opened this issue 3 years ago • 72 comments

While manually executing https://github.com/pangeo-forge/staged-recipes/pull/66, I encountered recurring hangs at multiple places within this loop:

https://github.com/pangeo-forge/pangeo-forge-recipes/blob/c4867719c4866ec46436b5d53d95ceb470a6f72e/pangeo_forge_recipes/recipes/xarray_zarr.py#L590-L626

In order to pinpoint the hang, I added these additional logs to xarray_zarr.py. Anecdotally, running with these additional logs indicated that the hang occurred most often at the call to np.asarray, but not exclusively at that line. None of the source file variable arrays for this recipe exceed much more than ~60 MB, and I was running a Large (12 GB - 16 GB) notebook server on https://us-central1-b.gcp.pangeo.io/hub, so it's hard to imagine it was a memory issue. (Note also that input file to target chunk mapping of this recipe is 1:1, and the input files are about ~80 MB in size.)

Hanging within the store_chunk loop arose at unpredictable intervals: usually after writing between 5 and 25 chunks, but sometimes after writing as many as a few hundred. Again anecdotally, it seemed that the hangs became more frequent once I'd reached around the 1500th loop or so.

KeyboardInterrupting and restarting the store_chunk loop from the hang location (without restarting the kernel) always resolved the issue and allowed the write to continue for another 5-25 writes before hanging again.

Ultimately, I restarted the loop like this a few dozen times, and eventually got all 2117 inputs written. During this process, I tried the following, to no avail:

  • switching to https://staging.us-central1-b.gcp.pangeo.io/hub
  • switching back to https://us-central1-b.gcp.pangeo.io/hub, and selectively updating all packages which could be involved in this loop to their latest versions: xarray, numpy, dask, distributed, fsspec, s3fs, and gcsfs
  • removing an explicit target_chunks kwarg from the recipe in https://github.com/pangeo-forge/staged-recipes/pull/66/commits/3b9d3fa151b6e6e12b837af391ed38f159c7fd8d, in case that was somehow redundant with nitems_per_file=1
  • changing the call to np.asarray to var.to_numpy() (which was admittedly unlikely to help, given that xarray implements to_numpy with np.asarray internally, and also because the hang was not exclusively on that line)

This feels like an environment or cloud I/O issue to me, but at this point I'm prepared to believe anything.

I've made a notebook which reproduces the execution scenario here: https://github.com/cisaacstern/pangeo-forge-debug-examples/tree/soda342-ice ... with the caveat the storage target is an fsspec.LocalFileSystem (since we can't include cloud creds in a public repo). The Binder link in the README does allow the notebook to be run, but it looks like Binder's local disk allotment of 500 MB (?) may fill up before the hang is reproduced.

cisaacstern avatar Aug 03 '21 18:08 cisaacstern

Anecdotally, running with these additional logs indicated that the hang occurred most often at the call to np.asarray

So this is the point where we are actually reading data. The call stack looks like this: xarray -> dask -> h5py -> fsspec. The problem is probably in the last two. It would be useful to see the full stack trace from your KeyboardInterrupt.

One way around this would be to try with copy_input_to_local_file=True. That would bypass h5py / fsspec and open the file directly from disk.

rabernat avatar Aug 04 '21 07:08 rabernat

One way around this would be to try with copy_input_to_local_file=True. That would bypass h5py / fsspec and open the file directly from disk.

Good to know. I hadn't been clear on the use case for that feature, but this makes sense.

It would be useful to see the full stack trace from your KeyboardInterrupt.

Here are two representative Tracebacks I created by re-writing the SODA ICE recipe to a scratch bucket on OSN, waiting for it to hang (which it did), and then KeyboardInterrupting.

In the first case below, I made it from the beginning to input 242 of 2117 before the hang on np.asarray. Looks like we made it through the h5py and gcsfs section of the call stack and into fsspec.asyn/fsspec.threading territory ...

Traceback 1
pangeo_forge_recipes.recipes.xarray_zarr - INFO - Storing variable hs chunk time-242 to Zarr region (slice(242, 243, None), slice(None, None, None), slice(None, None, None))
pangeo_forge_recipes.recipes.xarray_zarr - DEBUG - Converting variable mi of 6163200 bytes to `numpy.ndarray`
---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
<ipython-input-15-833e739e09c6> in <module>
      1 for chunk in chunks:
----> 2     recipe.store_chunk(chunk)

/srv/conda/envs/notebook/lib/python3.8/site-packages/pangeo_forge_recipes/recipes/xarray_zarr.py in store_chunk(chunk_key, target, concat_dim, nitems_per_input, file_pattern, inputs_per_chunk, subset_inputs, concat_dim_chunks, lock_timeout, xarray_concat_kwargs, process_chunk, target_chunks, input_cache, cache_inputs, copy_input_to_local_file, xarray_open_kwargs, delete_input_encoding, process_input, metadata_cache, is_opendap)
    614                         " size which will trigger this warning."
    615                     )
--> 616                 data = np.asarray(
    617                     var.data
    618                 )  # TODO: can we buffer large data rather than loading it all?

/srv/conda/envs/notebook/lib/python3.8/site-packages/numpy/core/_asarray.py in asarray(a, dtype, order, like)
    100         return _asarray_with_like(a, dtype=dtype, order=order, like=like)
    101 
--> 102     return array(a, dtype, copy=False, order=order)
    103 
    104 

/srv/conda/envs/notebook/lib/python3.8/site-packages/dask/array/core.py in __array__(self, dtype, **kwargs)
   1430 
   1431     def __array__(self, dtype=None, **kwargs):
-> 1432         x = self.compute()
   1433         if dtype and x.dtype != dtype:
   1434             x = x.astype(dtype)

/srv/conda/envs/notebook/lib/python3.8/site-packages/dask/base.py in compute(self, **kwargs)
    277         dask.base.compute
    278         """
--> 279         (result,) = compute(self, traverse=False, **kwargs)
    280         return result
    281 

/srv/conda/envs/notebook/lib/python3.8/site-packages/dask/base.py in compute(*args, **kwargs)
    559         postcomputes.append(x.__dask_postcompute__())
    560 
--> 561     results = schedule(dsk, keys, **kwargs)
    562     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    563 

/srv/conda/envs/notebook/lib/python3.8/site-packages/dask/local.py in get_sync(dsk, keys, **kwargs)
    526     """
    527     kwargs.pop("num_workers", None)  # if num_workers present, remove it
--> 528     return get_async(apply_sync, 1, dsk, keys, **kwargs)
    529 
    530 

/srv/conda/envs/notebook/lib/python3.8/site-packages/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
    470             # Seed initial tasks into the thread pool
    471             while state["ready"] and len(state["running"]) < num_workers:
--> 472                 fire_task()
    473 
    474             # Main loop, wait on tasks to finish, insert new ones

/srv/conda/envs/notebook/lib/python3.8/site-packages/dask/local.py in fire_task()
    455                 )
    456                 # Submit
--> 457                 apply_async(
    458                     execute_task,
    459                     args=(

/srv/conda/envs/notebook/lib/python3.8/site-packages/dask/local.py in apply_sync(func, args, kwds, callback)
    515 def apply_sync(func, args=(), kwds={}, callback=None):
    516     """ A naive synchronous version of apply_async """
--> 517     res = func(*args, **kwds)
    518     if callback is not None:
    519         callback(res)

/srv/conda/envs/notebook/lib/python3.8/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    225         failed = False
    226     except BaseException as e:
--> 227         result = pack_exception(e, dumps)
    228         failed = True
    229     return key, result, failed

/srv/conda/envs/notebook/lib/python3.8/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    220     try:
    221         task, data = loads(task_info)
--> 222         result = _execute_task(task, data)
    223         id = get_id()
    224         result = dumps((result, id))

/srv/conda/envs/notebook/lib/python3.8/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
    119         # temporaries by their reference count and can execute certain
    120         # operations in-place.
--> 121         return func(*(_execute_task(a, cache) for a in args))
    122     elif not ishashable(arg):
    123         return arg

/srv/conda/envs/notebook/lib/python3.8/site-packages/dask/array/core.py in getter(a, b, asarray, lock)
    104         c = a[b]
    105         if asarray:
--> 106             c = np.asarray(c)
    107     finally:
    108         if lock:

/srv/conda/envs/notebook/lib/python3.8/site-packages/numpy/core/_asarray.py in asarray(a, dtype, order, like)
    100         return _asarray_with_like(a, dtype=dtype, order=order, like=like)
    101 
--> 102     return array(a, dtype, copy=False, order=order)
    103 
    104 

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/indexing.py in __array__(self, dtype)
    352 
    353     def __array__(self, dtype=None):
--> 354         return np.asarray(self.array, dtype=dtype)
    355 
    356     def __getitem__(self, key):

/srv/conda/envs/notebook/lib/python3.8/site-packages/numpy/core/_asarray.py in asarray(a, dtype, order, like)
    100         return _asarray_with_like(a, dtype=dtype, order=order, like=like)
    101 
--> 102     return array(a, dtype, copy=False, order=order)
    103 
    104 

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/indexing.py in __array__(self, dtype)
    546 
    547     def __array__(self, dtype=None):
--> 548         self._ensure_cached()
    549         return np.asarray(self.array, dtype=dtype)
    550 

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/indexing.py in _ensure_cached(self)
    543     def _ensure_cached(self):
    544         if not isinstance(self.array, NumpyIndexingAdapter):
--> 545             self.array = NumpyIndexingAdapter(np.asarray(self.array))
    546 
    547     def __array__(self, dtype=None):

/srv/conda/envs/notebook/lib/python3.8/site-packages/numpy/core/_asarray.py in asarray(a, dtype, order, like)
    100         return _asarray_with_like(a, dtype=dtype, order=order, like=like)
    101 
--> 102     return array(a, dtype, copy=False, order=order)
    103 
    104 

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/indexing.py in __array__(self, dtype)
    516 
    517     def __array__(self, dtype=None):
--> 518         return np.asarray(self.array, dtype=dtype)
    519 
    520     def __getitem__(self, key):

/srv/conda/envs/notebook/lib/python3.8/site-packages/numpy/core/_asarray.py in asarray(a, dtype, order, like)
    100         return _asarray_with_like(a, dtype=dtype, order=order, like=like)
    101 
--> 102     return array(a, dtype, copy=False, order=order)
    103 
    104 

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/indexing.py in __array__(self, dtype)
    417     def __array__(self, dtype=None):
    418         array = as_indexable(self.array)
--> 419         return np.asarray(array[self.key], dtype=None)
    420 
    421     def transpose(self, order):

/srv/conda/envs/notebook/lib/python3.8/site-packages/numpy/core/_asarray.py in asarray(a, dtype, order, like)
    100         return _asarray_with_like(a, dtype=dtype, order=order, like=like)
    101 
--> 102     return array(a, dtype, copy=False, order=order)
    103 
    104 

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/coding/variables.py in __array__(self, dtype)
     68 
     69     def __array__(self, dtype=None):
---> 70         return self.func(self.array)
     71 
     72     def __repr__(self):

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/coding/variables.py in _apply_mask(data, encoded_fill_values, decoded_fill_value, dtype)
    135 ) -> np.ndarray:
    136     """Mask all matching values in a NumPy arrays."""
--> 137     data = np.asarray(data, dtype=dtype)
    138     condition = False
    139     for fv in encoded_fill_values:

/srv/conda/envs/notebook/lib/python3.8/site-packages/numpy/core/_asarray.py in asarray(a, dtype, order, like)
    100         return _asarray_with_like(a, dtype=dtype, order=order, like=like)
    101 
--> 102     return array(a, dtype, copy=False, order=order)
    103 
    104 

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/indexing.py in __array__(self, dtype)
    417     def __array__(self, dtype=None):
    418         array = as_indexable(self.array)
--> 419         return np.asarray(array[self.key], dtype=None)
    420 
    421     def transpose(self, order):

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/backends/h5netcdf_.py in __getitem__(self, key)
     46 
     47     def __getitem__(self, key):
---> 48         return indexing.explicit_indexing_adapter(
     49             key, self.shape, indexing.IndexingSupport.OUTER_1VECTOR, self._getitem
     50         )

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/indexing.py in explicit_indexing_adapter(key, shape, indexing_support, raw_indexing_method)
    708     """
    709     raw_key, numpy_indices = decompose_indexer(key, shape, indexing_support)
--> 710     result = raw_indexing_method(raw_key.tuple)
    711     if numpy_indices.tuple:
    712         # index the loaded np.ndarray

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/backends/h5netcdf_.py in _getitem(self, key)
     56         with self.datastore.lock:
     57             array = self.get_array(needs_lock=False)
---> 58             return array[key]
     59 
     60 

/srv/conda/envs/notebook/lib/python3.8/site-packages/h5netcdf/core.py in __getitem__(self, key)
    144 
    145     def __getitem__(self, key):
--> 146         return self._h5ds[key]
    147 
    148     def __setitem__(self, key, value):

h5py/_objects.pyx in h5py._objects.with_phil.wrapper()

h5py/_objects.pyx in h5py._objects.with_phil.wrapper()

/srv/conda/envs/notebook/lib/python3.8/site-packages/h5py/_hl/dataset.py in __getitem__(self, args, new_dtype)
    785         mspace = h5s.create_simple(selection.mshape)
    786         fspace = selection.id
--> 787         self.id.read(mspace, fspace, arr, mtype, dxpl=self._dxpl)
    788 
    789         # Patch up the output for NumPy

h5py/_objects.pyx in h5py._objects.with_phil.wrapper()

h5py/_objects.pyx in h5py._objects.with_phil.wrapper()

h5py/h5d.pyx in h5py.h5d.DatasetID.read()

h5py/_proxy.pyx in h5py._proxy.dset_rw()

h5py/h5fd.pyx in h5py.h5fd.H5FD_fileobj_read()

/srv/conda/envs/notebook/lib/python3.8/site-packages/fsspec/spec.py in readinto(self, b)
   1491         """
   1492         out = memoryview(b).cast("B")
-> 1493         data = self.read(out.nbytes)
   1494         out[: len(data)] = data
   1495         return len(data)

/srv/conda/envs/notebook/lib/python3.8/site-packages/fsspec/spec.py in read(self, length)
   1481             # don't even bother calling fetch
   1482             return b""
-> 1483         out = self.cache._fetch(self.loc, self.loc + length)
   1484         self.loc += len(out)
   1485         return out

/srv/conda/envs/notebook/lib/python3.8/site-packages/fsspec/caching.py in _fetch(self, start, end)
    151             part = b""
    152         end = min(self.size, end + self.blocksize)
--> 153         self.cache = self.fetcher(start, end)  # new block replaces old
    154         self.start = start
    155         self.end = self.start + len(self.cache)

/srv/conda/envs/notebook/lib/python3.8/site-packages/gcsfs/core.py in _fetch_range(self, start, end)
   1291             head = None
   1292         try:
-> 1293             _, data = self.gcsfs.call("GET", self.details["mediaLink"], headers=head)
   1294             return data
   1295         except RuntimeError as e:

/srv/conda/envs/notebook/lib/python3.8/site-packages/fsspec/asyn.py in wrapper(*args, **kwargs)
     86     def wrapper(*args, **kwargs):
     87         self = obj or args[0]
---> 88         return sync(self.loop, func, *args, **kwargs)
     89 
     90     return wrapper

/srv/conda/envs/notebook/lib/python3.8/site-packages/fsspec/asyn.py in sync(loop, func, timeout, *args, **kwargs)
     57     while True:
     58         # this loops allows thread to get interrupted
---> 59         if event.wait(1):
     60             break
     61         if timeout is not None:

/srv/conda/envs/notebook/lib/python3.8/threading.py in wait(self, timeout)
    556             signaled = self._flag
    557             if not signaled:
--> 558                 signaled = self._cond.wait(timeout)
    559             return signaled
    560 

/srv/conda/envs/notebook/lib/python3.8/threading.py in wait(self, timeout)
    304             else:
    305                 if timeout > 0:
--> 306                     gotit = waiter.acquire(True, timeout)
    307                 else:
    308                     gotit = waiter.acquire(False)

KeyboardInterrupt: 

For n=2 sample size, I restarted the store_chunk loop from the first hang location, and made it another ~200 iterations or so to input 486 before hitting what appears to be the same hang scenario again:

Traceback 2
pangeo_forge_recipes.recipes.xarray_zarr - INFO - Storing variable hs chunk time-486 to Zarr region (slice(486, 487, None), slice(None, None, None), slice(None, None, None))
pangeo_forge_recipes.recipes.xarray_zarr - DEBUG - Converting variable mi of 6163200 bytes to `numpy.ndarray`
---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
<ipython-input-9-e9838d599281> in <module>
      1 for chunk in chunks[242:]:
----> 2     recipe.store_chunk(chunk)

/srv/conda/envs/notebook/lib/python3.8/site-packages/pangeo_forge_recipes/recipes/xarray_zarr.py in store_chunk(chunk_key, target, concat_dim, nitems_per_input, file_pattern, inputs_per_chunk, subset_inputs, concat_dim_chunks, lock_timeout, xarray_concat_kwargs, process_chunk, target_chunks, input_cache, cache_inputs, copy_input_to_local_file, xarray_open_kwargs, delete_input_encoding, process_input, metadata_cache, is_opendap)
    614                         " size which will trigger this warning."
    615                     )
--> 616                 data = np.asarray(
    617                     var.data
    618                 )  # TODO: can we buffer large data rather than loading it all?

/srv/conda/envs/notebook/lib/python3.8/site-packages/numpy/core/_asarray.py in asarray(a, dtype, order, like)
    100         return _asarray_with_like(a, dtype=dtype, order=order, like=like)
    101 
--> 102     return array(a, dtype, copy=False, order=order)
    103 
    104 

/srv/conda/envs/notebook/lib/python3.8/site-packages/dask/array/core.py in __array__(self, dtype, **kwargs)
   1430 
   1431     def __array__(self, dtype=None, **kwargs):
-> 1432         x = self.compute()
   1433         if dtype and x.dtype != dtype:
   1434             x = x.astype(dtype)

/srv/conda/envs/notebook/lib/python3.8/site-packages/dask/base.py in compute(self, **kwargs)
    277         dask.base.compute
    278         """
--> 279         (result,) = compute(self, traverse=False, **kwargs)
    280         return result
    281 

/srv/conda/envs/notebook/lib/python3.8/site-packages/dask/base.py in compute(*args, **kwargs)
    559         postcomputes.append(x.__dask_postcompute__())
    560 
--> 561     results = schedule(dsk, keys, **kwargs)
    562     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    563 

/srv/conda/envs/notebook/lib/python3.8/site-packages/dask/local.py in get_sync(dsk, keys, **kwargs)
    526     """
    527     kwargs.pop("num_workers", None)  # if num_workers present, remove it
--> 528     return get_async(apply_sync, 1, dsk, keys, **kwargs)
    529 
    530 

/srv/conda/envs/notebook/lib/python3.8/site-packages/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
    470             # Seed initial tasks into the thread pool
    471             while state["ready"] and len(state["running"]) < num_workers:
--> 472                 fire_task()
    473 
    474             # Main loop, wait on tasks to finish, insert new ones

/srv/conda/envs/notebook/lib/python3.8/site-packages/dask/local.py in fire_task()
    455                 )
    456                 # Submit
--> 457                 apply_async(
    458                     execute_task,
    459                     args=(

/srv/conda/envs/notebook/lib/python3.8/site-packages/dask/local.py in apply_sync(func, args, kwds, callback)
    515 def apply_sync(func, args=(), kwds={}, callback=None):
    516     """ A naive synchronous version of apply_async """
--> 517     res = func(*args, **kwds)
    518     if callback is not None:
    519         callback(res)

/srv/conda/envs/notebook/lib/python3.8/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    225         failed = False
    226     except BaseException as e:
--> 227         result = pack_exception(e, dumps)
    228         failed = True
    229     return key, result, failed

/srv/conda/envs/notebook/lib/python3.8/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    220     try:
    221         task, data = loads(task_info)
--> 222         result = _execute_task(task, data)
    223         id = get_id()
    224         result = dumps((result, id))

/srv/conda/envs/notebook/lib/python3.8/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
    119         # temporaries by their reference count and can execute certain
    120         # operations in-place.
--> 121         return func(*(_execute_task(a, cache) for a in args))
    122     elif not ishashable(arg):
    123         return arg

/srv/conda/envs/notebook/lib/python3.8/site-packages/dask/array/core.py in getter(a, b, asarray, lock)
    104         c = a[b]
    105         if asarray:
--> 106             c = np.asarray(c)
    107     finally:
    108         if lock:

/srv/conda/envs/notebook/lib/python3.8/site-packages/numpy/core/_asarray.py in asarray(a, dtype, order, like)
    100         return _asarray_with_like(a, dtype=dtype, order=order, like=like)
    101 
--> 102     return array(a, dtype, copy=False, order=order)
    103 
    104 

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/indexing.py in __array__(self, dtype)
    352 
    353     def __array__(self, dtype=None):
--> 354         return np.asarray(self.array, dtype=dtype)
    355 
    356     def __getitem__(self, key):

/srv/conda/envs/notebook/lib/python3.8/site-packages/numpy/core/_asarray.py in asarray(a, dtype, order, like)
    100         return _asarray_with_like(a, dtype=dtype, order=order, like=like)
    101 
--> 102     return array(a, dtype, copy=False, order=order)
    103 
    104 

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/indexing.py in __array__(self, dtype)
    546 
    547     def __array__(self, dtype=None):
--> 548         self._ensure_cached()
    549         return np.asarray(self.array, dtype=dtype)
    550 

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/indexing.py in _ensure_cached(self)
    543     def _ensure_cached(self):
    544         if not isinstance(self.array, NumpyIndexingAdapter):
--> 545             self.array = NumpyIndexingAdapter(np.asarray(self.array))
    546 
    547     def __array__(self, dtype=None):

/srv/conda/envs/notebook/lib/python3.8/site-packages/numpy/core/_asarray.py in asarray(a, dtype, order, like)
    100         return _asarray_with_like(a, dtype=dtype, order=order, like=like)
    101 
--> 102     return array(a, dtype, copy=False, order=order)
    103 
    104 

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/indexing.py in __array__(self, dtype)
    516 
    517     def __array__(self, dtype=None):
--> 518         return np.asarray(self.array, dtype=dtype)
    519 
    520     def __getitem__(self, key):

/srv/conda/envs/notebook/lib/python3.8/site-packages/numpy/core/_asarray.py in asarray(a, dtype, order, like)
    100         return _asarray_with_like(a, dtype=dtype, order=order, like=like)
    101 
--> 102     return array(a, dtype, copy=False, order=order)
    103 
    104 

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/indexing.py in __array__(self, dtype)
    417     def __array__(self, dtype=None):
    418         array = as_indexable(self.array)
--> 419         return np.asarray(array[self.key], dtype=None)
    420 
    421     def transpose(self, order):

/srv/conda/envs/notebook/lib/python3.8/site-packages/numpy/core/_asarray.py in asarray(a, dtype, order, like)
    100         return _asarray_with_like(a, dtype=dtype, order=order, like=like)
    101 
--> 102     return array(a, dtype, copy=False, order=order)
    103 
    104 

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/coding/variables.py in __array__(self, dtype)
     68 
     69     def __array__(self, dtype=None):
---> 70         return self.func(self.array)
     71 
     72     def __repr__(self):

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/coding/variables.py in _apply_mask(data, encoded_fill_values, decoded_fill_value, dtype)
    135 ) -> np.ndarray:
    136     """Mask all matching values in a NumPy arrays."""
--> 137     data = np.asarray(data, dtype=dtype)
    138     condition = False
    139     for fv in encoded_fill_values:

/srv/conda/envs/notebook/lib/python3.8/site-packages/numpy/core/_asarray.py in asarray(a, dtype, order, like)
    100         return _asarray_with_like(a, dtype=dtype, order=order, like=like)
    101 
--> 102     return array(a, dtype, copy=False, order=order)
    103 
    104 

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/indexing.py in __array__(self, dtype)
    417     def __array__(self, dtype=None):
    418         array = as_indexable(self.array)
--> 419         return np.asarray(array[self.key], dtype=None)
    420 
    421     def transpose(self, order):

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/backends/h5netcdf_.py in __getitem__(self, key)
     46 
     47     def __getitem__(self, key):
---> 48         return indexing.explicit_indexing_adapter(
     49             key, self.shape, indexing.IndexingSupport.OUTER_1VECTOR, self._getitem
     50         )

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/indexing.py in explicit_indexing_adapter(key, shape, indexing_support, raw_indexing_method)
    708     """
    709     raw_key, numpy_indices = decompose_indexer(key, shape, indexing_support)
--> 710     result = raw_indexing_method(raw_key.tuple)
    711     if numpy_indices.tuple:
    712         # index the loaded np.ndarray

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/backends/h5netcdf_.py in _getitem(self, key)
     56         with self.datastore.lock:
     57             array = self.get_array(needs_lock=False)
---> 58             return array[key]
     59 
     60 

/srv/conda/envs/notebook/lib/python3.8/site-packages/h5netcdf/core.py in __getitem__(self, key)
    144 
    145     def __getitem__(self, key):
--> 146         return self._h5ds[key]
    147 
    148     def __setitem__(self, key, value):

h5py/_objects.pyx in h5py._objects.with_phil.wrapper()

h5py/_objects.pyx in h5py._objects.with_phil.wrapper()

/srv/conda/envs/notebook/lib/python3.8/site-packages/h5py/_hl/dataset.py in __getitem__(self, args, new_dtype)
    760         mspace = h5s.create_simple(selection.mshape)
    761         fspace = selection.id
--> 762         self.id.read(mspace, fspace, arr, mtype, dxpl=self._dxpl)
    763 
    764         # Patch up the output for NumPy

h5py/_objects.pyx in h5py._objects.with_phil.wrapper()

h5py/_objects.pyx in h5py._objects.with_phil.wrapper()

h5py/h5d.pyx in h5py.h5d.DatasetID.read()

h5py/_proxy.pyx in h5py._proxy.dset_rw()

h5py/h5fd.pyx in h5py.h5fd.H5FD_fileobj_read()

/srv/conda/envs/notebook/lib/python3.8/site-packages/fsspec/spec.py in readinto(self, b)
   1491         """
   1492         out = memoryview(b).cast("B")
-> 1493         data = self.read(out.nbytes)
   1494         out[: len(data)] = data
   1495         return len(data)

/srv/conda/envs/notebook/lib/python3.8/site-packages/fsspec/spec.py in read(self, length)
   1481             # don't even bother calling fetch
   1482             return b""
-> 1483         out = self.cache._fetch(self.loc, self.loc + length)
   1484         self.loc += len(out)
   1485         return out

/srv/conda/envs/notebook/lib/python3.8/site-packages/fsspec/caching.py in _fetch(self, start, end)
    151             part = b""
    152         end = min(self.size, end + self.blocksize)
--> 153         self.cache = self.fetcher(start, end)  # new block replaces old
    154         self.start = start
    155         self.end = self.start + len(self.cache)

/srv/conda/envs/notebook/lib/python3.8/site-packages/gcsfs/core.py in _fetch_range(self, start, end)
   1291             head = None
   1292         try:
-> 1293             _, data = self.gcsfs.call("GET", self.details["mediaLink"], headers=head)
   1294             return data
   1295         except RuntimeError as e:

/srv/conda/envs/notebook/lib/python3.8/site-packages/fsspec/asyn.py in wrapper(*args, **kwargs)
     86     def wrapper(*args, **kwargs):
     87         self = obj or args[0]
---> 88         return sync(self.loop, func, *args, **kwargs)
     89 
     90     return wrapper

/srv/conda/envs/notebook/lib/python3.8/site-packages/fsspec/asyn.py in sync(loop, func, timeout, *args, **kwargs)
     57     while True:
     58         # this loops allows thread to get interrupted
---> 59         if event.wait(1):
     60             break
     61         if timeout is not None:

/srv/conda/envs/notebook/lib/python3.8/threading.py in wait(self, timeout)
    556             signaled = self._flag
    557             if not signaled:
--> 558                 signaled = self._cond.wait(timeout)
    559             return signaled
    560 

/srv/conda/envs/notebook/lib/python3.8/threading.py in wait(self, timeout)
    304             else:
    305                 if timeout > 0:
--> 306                     gotit = waiter.acquire(True, timeout)
    307                 else:
    308                     gotit = waiter.acquire(False)

KeyboardInterrupt: 

In both of these cases, the hang was on variable mi, but I believe that's coincidental. Based on my prior experience, it seemed the hang could occur on any of the data variables.

cisaacstern avatar Aug 05 '21 18:08 cisaacstern

Feel free to try with https://github.com/intake/filesystem_spec/pull/694 ; this would mean that h5py is being garbage collected during IO, and trying to grab a lock at that point.

Also, you can call https://github.com/intake/filesystem_spec/blob/master/fsspec/asyn.py#L688 to reset coroutines, or https://github.com/dask/distributed/pull/4726 to cause a cancel-and-retry on the hanging dask task.

martindurant avatar Aug 05 '21 18:08 martindurant

Feel free to try with intake/filesystem_spec#694

This didn't solve it. Hit a hang with the same traceback as above on input 245 of 2117.

Also, you can call https://github.com/intake/filesystem_spec/blob/master/fsspec/asyn.py#L688 to reset coroutines, or dask/distributed#4726 to cause a cancel-and-retry on the hanging dask task.

What would this look like in practice? Wrapping

https://github.com/pangeo-forge/pangeo-forge-recipes/blob/c4867719c4866ec46436b5d53d95ceb470a6f72e/pangeo_forge_recipes/recipes/xarray_zarr.py#L616-L617

with some kind of try/except timeout, and then calling one of these methods as a fallback if timeout is exceeded?

cisaacstern avatar Aug 05 '21 21:08 cisaacstern

The gridMET recipe is also currently blocked by this ref https://github.com/pangeo-forge/staged-recipes/pull/79

sharkinsspatial avatar Sep 14 '21 19:09 sharkinsspatial

This issue is being reported consistently and threatens to be a major stumbling point for a smooth public launch. It is still blocking https://github.com/pangeo-forge/staged-recipes/pull/68#issuecomment-897866695. And today, in manually re-running portions of https://github.com/pangeo-forge/staged-recipes/pull/52, I ran into what appears to be the same issue:

Traceback
---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
<ipython-input-17-0bb01730da6f> in <module>
      1 for chunk in rec.iter_chunks():
----> 2     rec.store_chunk(chunk)

/srv/conda/envs/notebook/lib/python3.8/site-packages/pangeo_forge_recipes/recipes/xarray_zarr.py in store_chunk(chunk_key, target, concat_dim, nitems_per_input, file_pattern, inputs_per_chunk, subset_inputs, concat_dim_chunks, lock_timeout, xarray_concat_kwargs, process_chunk, target_chunks, input_cache, cache_inputs, copy_input_to_local_file, xarray_open_kwargs, delete_input_encoding, process_input, metadata_cache)
    605                         " size which will trigger this warning."
    606                     )
--> 607                 data = np.asarray(
    608                     var.data
    609                 )  # TODO: can we buffer large data rather than loading it all?

/srv/conda/envs/notebook/lib/python3.8/site-packages/numpy/core/_asarray.py in asarray(a, dtype, order, like)
    100         return _asarray_with_like(a, dtype=dtype, order=order, like=like)
    101 
--> 102     return array(a, dtype, copy=False, order=order)
    103 
    104 

/srv/conda/envs/notebook/lib/python3.8/site-packages/dask/array/core.py in __array__(self, dtype, **kwargs)
   1430 
   1431     def __array__(self, dtype=None, **kwargs):
-> 1432         x = self.compute()
   1433         if dtype and x.dtype != dtype:
   1434             x = x.astype(dtype)

/srv/conda/envs/notebook/lib/python3.8/site-packages/dask/base.py in compute(self, **kwargs)
    277         dask.base.compute
    278         """
--> 279         (result,) = compute(self, traverse=False, **kwargs)
    280         return result
    281 

/srv/conda/envs/notebook/lib/python3.8/site-packages/dask/base.py in compute(*args, **kwargs)
    559         postcomputes.append(x.__dask_postcompute__())
    560 
--> 561     results = schedule(dsk, keys, **kwargs)
    562     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    563 

/srv/conda/envs/notebook/lib/python3.8/site-packages/dask/local.py in get_sync(dsk, keys, **kwargs)
    526     """
    527     kwargs.pop("num_workers", None)  # if num_workers present, remove it
--> 528     return get_async(apply_sync, 1, dsk, keys, **kwargs)
    529 
    530 

/srv/conda/envs/notebook/lib/python3.8/site-packages/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
    470             # Seed initial tasks into the thread pool
    471             while state["ready"] and len(state["running"]) < num_workers:
--> 472                 fire_task()
    473 
    474             # Main loop, wait on tasks to finish, insert new ones

/srv/conda/envs/notebook/lib/python3.8/site-packages/dask/local.py in fire_task()
    455                 )
    456                 # Submit
--> 457                 apply_async(
    458                     execute_task,
    459                     args=(

/srv/conda/envs/notebook/lib/python3.8/site-packages/dask/local.py in apply_sync(func, args, kwds, callback)
    515 def apply_sync(func, args=(), kwds={}, callback=None):
    516     """ A naive synchronous version of apply_async """
--> 517     res = func(*args, **kwds)
    518     if callback is not None:
    519         callback(res)

/srv/conda/envs/notebook/lib/python3.8/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    225         failed = False
    226     except BaseException as e:
--> 227         result = pack_exception(e, dumps)
    228         failed = True
    229     return key, result, failed

/srv/conda/envs/notebook/lib/python3.8/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    220     try:
    221         task, data = loads(task_info)
--> 222         result = _execute_task(task, data)
    223         id = get_id()
    224         result = dumps((result, id))

/srv/conda/envs/notebook/lib/python3.8/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
    119         # temporaries by their reference count and can execute certain
    120         # operations in-place.
--> 121         return func(*(_execute_task(a, cache) for a in args))
    122     elif not ishashable(arg):
    123         return arg

/srv/conda/envs/notebook/lib/python3.8/site-packages/dask/array/core.py in getter(a, b, asarray, lock)
    104         c = a[b]
    105         if asarray:
--> 106             c = np.asarray(c)
    107     finally:
    108         if lock:

/srv/conda/envs/notebook/lib/python3.8/site-packages/numpy/core/_asarray.py in asarray(a, dtype, order, like)
    100         return _asarray_with_like(a, dtype=dtype, order=order, like=like)
    101 
--> 102     return array(a, dtype, copy=False, order=order)
    103 
    104 

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/indexing.py in __array__(self, dtype)
    352 
    353     def __array__(self, dtype=None):
--> 354         return np.asarray(self.array, dtype=dtype)
    355 
    356     def __getitem__(self, key):

/srv/conda/envs/notebook/lib/python3.8/site-packages/numpy/core/_asarray.py in asarray(a, dtype, order, like)
    100         return _asarray_with_like(a, dtype=dtype, order=order, like=like)
    101 
--> 102     return array(a, dtype, copy=False, order=order)
    103 
    104 

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/indexing.py in __array__(self, dtype)
    546 
    547     def __array__(self, dtype=None):
--> 548         self._ensure_cached()
    549         return np.asarray(self.array, dtype=dtype)
    550 

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/indexing.py in _ensure_cached(self)
    543     def _ensure_cached(self):
    544         if not isinstance(self.array, NumpyIndexingAdapter):
--> 545             self.array = NumpyIndexingAdapter(np.asarray(self.array))
    546 
    547     def __array__(self, dtype=None):

/srv/conda/envs/notebook/lib/python3.8/site-packages/numpy/core/_asarray.py in asarray(a, dtype, order, like)
    100         return _asarray_with_like(a, dtype=dtype, order=order, like=like)
    101 
--> 102     return array(a, dtype, copy=False, order=order)
    103 
    104 

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/indexing.py in __array__(self, dtype)
    516 
    517     def __array__(self, dtype=None):
--> 518         return np.asarray(self.array, dtype=dtype)
    519 
    520     def __getitem__(self, key):

/srv/conda/envs/notebook/lib/python3.8/site-packages/numpy/core/_asarray.py in asarray(a, dtype, order, like)
    100         return _asarray_with_like(a, dtype=dtype, order=order, like=like)
    101 
--> 102     return array(a, dtype, copy=False, order=order)
    103 
    104 

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/indexing.py in __array__(self, dtype)
    417     def __array__(self, dtype=None):
    418         array = as_indexable(self.array)
--> 419         return np.asarray(array[self.key], dtype=None)
    420 
    421     def transpose(self, order):

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/backends/h5netcdf_.py in __getitem__(self, key)
     46 
     47     def __getitem__(self, key):
---> 48         return indexing.explicit_indexing_adapter(
     49             key, self.shape, indexing.IndexingSupport.OUTER_1VECTOR, self._getitem
     50         )

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/indexing.py in explicit_indexing_adapter(key, shape, indexing_support, raw_indexing_method)
    708     """
    709     raw_key, numpy_indices = decompose_indexer(key, shape, indexing_support)
--> 710     result = raw_indexing_method(raw_key.tuple)
    711     if numpy_indices.tuple:
    712         # index the loaded np.ndarray

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/backends/h5netcdf_.py in _getitem(self, key)
     56         with self.datastore.lock:
     57             array = self.get_array(needs_lock=False)
---> 58             return array[key]
     59 
     60 

/srv/conda/envs/notebook/lib/python3.8/site-packages/h5netcdf/core.py in __getitem__(self, key)
    144 
    145     def __getitem__(self, key):
--> 146         return self._h5ds[key]
    147 
    148     def __setitem__(self, key, value):

h5py/_objects.pyx in h5py._objects.with_phil.wrapper()

h5py/_objects.pyx in h5py._objects.with_phil.wrapper()

/srv/conda/envs/notebook/lib/python3.8/site-packages/h5py/_hl/dataset.py in __getitem__(self, args, new_dtype)
    698         if self._fast_read_ok and (new_dtype is None):
    699             try:
--> 700                 return self._fast_reader.read(args)
    701             except TypeError:
    702                 pass  # Fall back to Python read pathway below

h5py/_selector.pyx in h5py._selector.Reader.read()

h5py/h5fd.pyx in h5py.h5fd.H5FD_fileobj_read()

/srv/conda/envs/notebook/lib/python3.8/site-packages/fsspec/spec.py in readinto(self, b)
   1493         """
   1494         out = memoryview(b).cast("B")
-> 1495         data = self.read(out.nbytes)
   1496         out[: len(data)] = data
   1497         return len(data)

/srv/conda/envs/notebook/lib/python3.8/site-packages/fsspec/spec.py in read(self, length)
   1483             # don't even bother calling fetch
   1484             return b""
-> 1485         out = self.cache._fetch(self.loc, self.loc + length)
   1486         self.loc += len(out)
   1487         return out

/srv/conda/envs/notebook/lib/python3.8/site-packages/fsspec/caching.py in _fetch(self, start, end)
    151             part = b""
    152         end = min(self.size, end + self.blocksize)
--> 153         self.cache = self.fetcher(start, end)  # new block replaces old
    154         self.start = start
    155         self.end = self.start + len(self.cache)

/srv/conda/envs/notebook/lib/python3.8/site-packages/gcsfs/core.py in _fetch_range(self, start, end)
   1291             head = None
   1292         try:
-> 1293             _, data = self.gcsfs.call("GET", self.details["mediaLink"], headers=head)
   1294             return data
   1295         except RuntimeError as e:

/srv/conda/envs/notebook/lib/python3.8/site-packages/fsspec/asyn.py in wrapper(*args, **kwargs)
     86     def wrapper(*args, **kwargs):
     87         self = obj or args[0]
---> 88         return sync(self.loop, func, *args, **kwargs)
     89 
     90     return wrapper

/srv/conda/envs/notebook/lib/python3.8/site-packages/fsspec/asyn.py in sync(loop, func, timeout, *args, **kwargs)
     57     while True:
     58         # this loops allows thread to get interrupted
---> 59         if event.wait(1):
     60             break
     61         if timeout is not None:

/srv/conda/envs/notebook/lib/python3.8/threading.py in wait(self, timeout)
    556             signaled = self._flag
    557             if not signaled:
--> 558                 signaled = self._cond.wait(timeout)
    559             return signaled
    560 

/srv/conda/envs/notebook/lib/python3.8/threading.py in wait(self, timeout)
    304             else:
    305                 if timeout > 0:
--> 306                     gotit = waiter.acquire(True, timeout)
    307                 else:
    308                     gotit = waiter.acquire(False)

KeyboardInterrupt: 

As noted in https://github.com/pangeo-forge/staged-recipes/pull/68#issuecomment-923503888, https://github.com/pangeo-forge/pangeo-forge-recipes/issues/181 presents one longer-term solution. Pinpointing the upstream issue would be very useful.

cisaacstern avatar Sep 21 '21 02:09 cisaacstern

😩 I agree this is a major blocker.

We should work hard to find a minimal reproducer (without Pangeo Forge) that we can take to fsspec or h5py to reproduce the issue.

As a first step, can we pick a single file which experiences this problem and place it in a prominent public location?

rabernat avatar Sep 21 '21 12:09 rabernat

Here's an example of a minimal reproducer which I hoped might (but actually does not) reproduce the problem.

import h5py
import fsspec
import numpy as np

url = 'https://support.hdfgroup.org/ftp/HDF5/examples/files/exbyapi/h5ex_d_chunk.h5'

for n in range(100):
    with fsspec.open(url) as ofile:
        with ofile as fp:
            h5file = h5py.File(fp)
            _ = np.asarray(h5file['DS1'])

The sample file was taken from the official HDF5 website.

rabernat avatar Sep 21 '21 13:09 rabernat

Maybe if you do this within a dask local cluster?

@cisaacstern , were you using distributed or the simple threaded scheduler? It seems to be the latter? In that case, as a variation on the garbage collection idea, could you try simply adding gc.disable() before running the job?

martindurant avatar Sep 21 '21 14:09 martindurant

I think I have reproducer for this using xarray (rather than h5py directly) and modeled after the store_chunk loop. I've followed this SO post to setup a timeout exception to determine when the loop was hanging. Average iterations should take ~3 seconds so I figured 60 seconds was a reasonable timeout.

A note on https://github.com/pangeo-forge/pangeo-forge-recipes/issues/177#issuecomment-923984937: I'm not aware that this particular issue has ever occurred over HTTP. All of the times I've seen it have been in reading from the GCS cache, and the related Dask issue https://github.com/dask/dask/issues/7547#issuecomment-872563337 is for reads from S3. For this example, I'm using the cached files for https://github.com/pangeo-forge/staged-recipes/pull/66. Problems reading from this cache were the reason this Issue was opened initially. The bucket is publicly accessible as shown in the code below.

Perhaps we can use this as a basis for a more minimal reproducer using only gcsfs, h5py, and numpy.

import signal
import gcsfs
import numpy as np
import xarray as xr
from tqdm import tqdm

gcs = gcsfs.GCSFileSystem(anon=True)
cache_base = "pangeo-forge-us-central1/pangeo-forge-cache"
paths = gcs.ls(f"{cache_base}/soda342/5day_ice")


def read_data(n):
    with gcs.open(paths[n]) as ofile:
        with xr.open_dataset(ofile) as ds:
            for _, var_coded in ds.variables.items():
                var = xr.backends.zarr.encode_zarr_variable(var_coded)
                _ = np.asarray(var.data)


def handler(signum, frame):
    raise Exception

signal.signal(signal.SIGALRM, handler)
timeout = 60
signal.alarm(timeout)

for n in tqdm(range(len(paths))):
    try:
        read_data(n)
    except Exception:
        print(f"Iteration {n} exceeded {timeout} seconds.")
1%|          | 23/2117 [00:59<1:41:35,  2.91s/it]
Iteration 22 exceeded 60 seconds.
Traceback
---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
<ipython-input-1-77edb95f5ddb> in <module>
     28 for n in tqdm(range(len(paths))):
     29     try:
---> 30         read_data(n)
     31     except Exception:
     32         print(f"Iteration {n} exceeded {timeout} seconds.")

<ipython-input-1-77edb95f5ddb> in read_data(n)
     13 def read_data(n):
     14     with gcs.open(paths[n]) as ofile:
---> 15         with xr.open_dataset(ofile) as ds:
     16             for _, var_coded in ds.variables.items():
     17                 var = xr.backends.zarr.encode_zarr_variable(var_coded)

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/backends/api.py in open_dataset(filename_or_obj, engine, chunks, cache, decode_cf, mask_and_scale, decode_times, decode_timedelta, use_cftime, concat_characters, decode_coords, drop_variables, backend_kwargs, *args, **kwargs)
    495 
    496     overwrite_encoded_chunks = kwargs.pop("overwrite_encoded_chunks", None)
--> 497     backend_ds = backend.open_dataset(
    498         filename_or_obj,
    499         drop_variables=drop_variables,

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/backends/h5netcdf_.py in open_dataset(self, filename_or_obj, mask_and_scale, decode_times, concat_characters, decode_coords, drop_variables, use_cftime, decode_timedelta, format, group, lock, invalid_netcdf, phony_dims, decode_vlen_strings)
    384         store_entrypoint = StoreBackendEntrypoint()
    385 
--> 386         ds = store_entrypoint.open_dataset(
    387             store,
    388             mask_and_scale=mask_and_scale,

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/backends/store.py in open_dataset(self, store, mask_and_scale, decode_times, concat_characters, decode_coords, drop_variables, use_cftime, decode_timedelta)
     25         encoding = store.get_encoding()
     26 
---> 27         vars, attrs, coord_names = conventions.decode_cf_variables(
     28             vars,
     29             attrs,

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/conventions.py in decode_cf_variables(variables, attributes, concat_characters, mask_and_scale, decode_times, decode_coords, drop_variables, use_cftime, decode_timedelta)
    510             and stackable(v.dims[-1])
    511         )
--> 512         new_vars[k] = decode_cf_variable(
    513             k,
    514             v,

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/conventions.py in decode_cf_variable(name, var, concat_characters, mask_and_scale, decode_times, decode_endianness, stack_char_dim, use_cftime, decode_timedelta)
    358         var = times.CFTimedeltaCoder().decode(var, name=name)
    359     if decode_times:
--> 360         var = times.CFDatetimeCoder(use_cftime=use_cftime).decode(var, name=name)
    361 
    362     dimensions, data, attributes, encoding = variables.unpack_for_decoding(var)

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/coding/times.py in decode(self, variable, name)
    525             units = pop_to(attrs, encoding, "units")
    526             calendar = pop_to(attrs, encoding, "calendar")
--> 527             dtype = _decode_cf_datetime_dtype(data, units, calendar, self.use_cftime)
    528             transform = partial(
    529                 decode_cf_datetime,

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/coding/times.py in _decode_cf_datetime_dtype(data, units, calendar, use_cftime)
    143     values = indexing.ImplicitToExplicitIndexingAdapter(indexing.as_indexable(data))
    144     example_value = np.concatenate(
--> 145         [first_n_items(values, 1) or [0], last_item(values) or [0]]
    146     )
    147 

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/formatting.py in first_n_items(array, n_desired)
     70         indexer = _get_indexer_at_least_n_items(array.shape, n_desired, from_end=False)
     71         array = array[indexer]
---> 72     return np.asarray(array).flat[:n_desired]
     73 
     74 

/srv/conda/envs/notebook/lib/python3.8/site-packages/numpy/core/_asarray.py in asarray(a, dtype, order, like)
    100         return _asarray_with_like(a, dtype=dtype, order=order, like=like)
    101 
--> 102     return array(a, dtype, copy=False, order=order)
    103 
    104 

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/indexing.py in __array__(self, dtype)
    352 
    353     def __array__(self, dtype=None):
--> 354         return np.asarray(self.array, dtype=dtype)
    355 
    356     def __getitem__(self, key):

/srv/conda/envs/notebook/lib/python3.8/site-packages/numpy/core/_asarray.py in asarray(a, dtype, order, like)
    100         return _asarray_with_like(a, dtype=dtype, order=order, like=like)
    101 
--> 102     return array(a, dtype, copy=False, order=order)
    103 
    104 

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/coding/variables.py in __array__(self, dtype)
     68 
     69     def __array__(self, dtype=None):
---> 70         return self.func(self.array)
     71 
     72     def __repr__(self):

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/coding/variables.py in _apply_mask(data, encoded_fill_values, decoded_fill_value, dtype)
    135 ) -> np.ndarray:
    136     """Mask all matching values in a NumPy arrays."""
--> 137     data = np.asarray(data, dtype=dtype)
    138     condition = False
    139     for fv in encoded_fill_values:

/srv/conda/envs/notebook/lib/python3.8/site-packages/numpy/core/_asarray.py in asarray(a, dtype, order, like)
    100         return _asarray_with_like(a, dtype=dtype, order=order, like=like)
    101 
--> 102     return array(a, dtype, copy=False, order=order)
    103 
    104 

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/indexing.py in __array__(self, dtype)
    417     def __array__(self, dtype=None):
    418         array = as_indexable(self.array)
--> 419         return np.asarray(array[self.key], dtype=None)
    420 
    421     def transpose(self, order):

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/backends/h5netcdf_.py in __getitem__(self, key)
     46 
     47     def __getitem__(self, key):
---> 48         return indexing.explicit_indexing_adapter(
     49             key, self.shape, indexing.IndexingSupport.OUTER_1VECTOR, self._getitem
     50         )

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/indexing.py in explicit_indexing_adapter(key, shape, indexing_support, raw_indexing_method)
    708     """
    709     raw_key, numpy_indices = decompose_indexer(key, shape, indexing_support)
--> 710     result = raw_indexing_method(raw_key.tuple)
    711     if numpy_indices.tuple:
    712         # index the loaded np.ndarray

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/backends/h5netcdf_.py in _getitem(self, key)
     56         with self.datastore.lock:
     57             array = self.get_array(needs_lock=False)
---> 58             return array[key]
     59 
     60 

/srv/conda/envs/notebook/lib/python3.8/site-packages/h5netcdf/core.py in __getitem__(self, key)
    144 
    145     def __getitem__(self, key):
--> 146         return self._h5ds[key]
    147 
    148     def __setitem__(self, key, value):

h5py/_objects.pyx in h5py._objects.with_phil.wrapper()

h5py/_objects.pyx in h5py._objects.with_phil.wrapper()

/srv/conda/envs/notebook/lib/python3.8/site-packages/h5py/_hl/dataset.py in __getitem__(self, args, new_dtype)
    698         if self._fast_read_ok and (new_dtype is None):
    699             try:
--> 700                 return self._fast_reader.read(args)
    701             except TypeError:
    702                 pass  # Fall back to Python read pathway below

h5py/_selector.pyx in h5py._selector.Reader.read()

h5py/h5fd.pyx in h5py.h5fd.H5FD_fileobj_read()

/srv/conda/envs/notebook/lib/python3.8/site-packages/fsspec/spec.py in readinto(self, b)
   1493         """
   1494         out = memoryview(b).cast("B")
-> 1495         data = self.read(out.nbytes)
   1496         out[: len(data)] = data
   1497         return len(data)

/srv/conda/envs/notebook/lib/python3.8/site-packages/fsspec/spec.py in read(self, length)
   1483             # don't even bother calling fetch
   1484             return b""
-> 1485         out = self.cache._fetch(self.loc, self.loc + length)
   1486         self.loc += len(out)
   1487         return out

/srv/conda/envs/notebook/lib/python3.8/site-packages/fsspec/caching.py in _fetch(self, start, end)
    151             part = b""
    152         end = min(self.size, end + self.blocksize)
--> 153         self.cache = self.fetcher(start, end)  # new block replaces old
    154         self.start = start
    155         self.end = self.start + len(self.cache)

/srv/conda/envs/notebook/lib/python3.8/site-packages/gcsfs/core.py in _fetch_range(self, start, end)
   1291             head = None
   1292         try:
-> 1293             _, data = self.gcsfs.call("GET", self.details["mediaLink"], headers=head)
   1294             return data
   1295         except RuntimeError as e:

/srv/conda/envs/notebook/lib/python3.8/site-packages/fsspec/asyn.py in wrapper(*args, **kwargs)
     86     def wrapper(*args, **kwargs):
     87         self = obj or args[0]
---> 88         return sync(self.loop, func, *args, **kwargs)
     89 
     90     return wrapper

/srv/conda/envs/notebook/lib/python3.8/site-packages/fsspec/asyn.py in sync(loop, func, timeout, *args, **kwargs)
     57     while True:
     58         # this loops allows thread to get interrupted
---> 59         if event.wait(1):
     60             break
     61         if timeout is not None:

/srv/conda/envs/notebook/lib/python3.8/threading.py in wait(self, timeout)
    556             signaled = self._flag
    557             if not signaled:
--> 558                 signaled = self._cond.wait(timeout)
    559             return signaled
    560 

/srv/conda/envs/notebook/lib/python3.8/threading.py in wait(self, timeout)
    304             else:
    305                 if timeout > 0:
--> 306                     gotit = waiter.acquire(True, timeout)
    307                 else:
    308                     gotit = waiter.acquire(False)

KeyboardInterrupt: 

cisaacstern avatar Sep 21 '21 19:09 cisaacstern

were you using distributed or the simple threaded scheduler? It seems to be the latter? In that case, as a variation on the garbage collection idea, could you try simply adding gc.disable() before running the job?

@martindurant, yes I was on a single thread, and anecdotally gc.disable() did not resolve the issue.

cisaacstern avatar Sep 21 '21 19:09 cisaacstern

OK, so explicitly its the mixture of xarray/h5py/fsspec/threads. If gc doesn't make a difference, then I am once again at a loss.

martindurant avatar Sep 21 '21 19:09 martindurant

Perhaps we can use this as a basis for a more minimal reproducer using only gcsfs, h5py, and numpy.

Here we go! Seemingly the same issue, but without xarray:

import signal
import gcsfs
import h5py
import numpy as np
from tqdm import tqdm

gcs = gcsfs.GCSFileSystem(anon=True)
cache_base = "pangeo-forge-us-central1/pangeo-forge-cache"
paths = gcs.ls(f"{cache_base}/soda342/5day_ice")

def read_data(n):
    with gcs.open(paths[n]) as ofile:
        with ofile as fp:
            h5file = h5py.File(fp)
            for var_name in h5file.keys():
                _ = np.asarray(h5file[var_name])

def handler(signum, frame):
    raise Exception

signal.signal(signal.SIGALRM, handler)
timeout = 60
signal.alarm(timeout)

for n in tqdm(range(len(paths))):
    try:
        read_data(n)
    except Exception as e:
        raise ValueError(
            f"Iteration {n} exceeded {timeout} seconds."
        ) from e
1%|▏         | 27/2117 [00:59<1:17:24,  2.22s/it]
Traceback
---------------------------------------------------------------------------
Exception                                 Traceback (most recent call last)
<ipython-input-1-68fa8848d30b> in <module>
     26     try:
---> 27         read_data(n)
     28     except Exception as e:

<ipython-input-1-68fa8848d30b> in read_data(n)
     15             for var_name in h5file.keys():
---> 16                 _ = np.asarray(h5file[var_name])
     17 

h5py/_objects.pyx in h5py._objects.with_phil.wrapper()

h5py/_objects.pyx in h5py._objects.with_phil.wrapper()

/srv/conda/envs/notebook/lib/python3.8/site-packages/h5py/_hl/group.py in __getitem__(self, name)
    304         elif isinstance(name, (bytes, str)):
--> 305             oid = h5o.open(self.id, self._e(name), lapl=self._lapl)
    306         else:

h5py/_objects.pyx in h5py._objects.with_phil.wrapper()

h5py/_objects.pyx in h5py._objects.with_phil.wrapper()

h5py/h5o.pyx in h5py.h5o.open()

h5py/h5fd.pyx in h5py.h5fd.H5FD_fileobj_read()

/srv/conda/envs/notebook/lib/python3.8/site-packages/fsspec/spec.py in readinto(self, b)
   1494         out = memoryview(b).cast("B")
-> 1495         data = self.read(out.nbytes)
   1496         out[: len(data)] = data

/srv/conda/envs/notebook/lib/python3.8/site-packages/fsspec/spec.py in read(self, length)
   1484             return b""
-> 1485         out = self.cache._fetch(self.loc, self.loc + length)
   1486         self.loc += len(out)

/srv/conda/envs/notebook/lib/python3.8/site-packages/fsspec/caching.py in _fetch(self, start, end)
    152         end = min(self.size, end + self.blocksize)
--> 153         self.cache = self.fetcher(start, end)  # new block replaces old
    154         self.start = start

/srv/conda/envs/notebook/lib/python3.8/site-packages/gcsfs/core.py in _fetch_range(self, start, end)
   1292         try:
-> 1293             _, data = self.gcsfs.call("GET", self.details["mediaLink"], headers=head)
   1294             return data

/srv/conda/envs/notebook/lib/python3.8/site-packages/fsspec/asyn.py in wrapper(*args, **kwargs)
     87         self = obj or args[0]
---> 88         return sync(self.loop, func, *args, **kwargs)
     89 

/srv/conda/envs/notebook/lib/python3.8/site-packages/fsspec/asyn.py in sync(loop, func, timeout, *args, **kwargs)
     58         # this loops allows thread to get interrupted
---> 59         if event.wait(1):
     60             break

/srv/conda/envs/notebook/lib/python3.8/threading.py in wait(self, timeout)
    557             if not signaled:
--> 558                 signaled = self._cond.wait(timeout)
    559             return signaled

/srv/conda/envs/notebook/lib/python3.8/threading.py in wait(self, timeout)
    305                 if timeout > 0:
--> 306                     gotit = waiter.acquire(True, timeout)
    307                 else:

<ipython-input-1-68fa8848d30b> in handler(signum, frame)
     18 def handler(signum, frame):
---> 19     raise Exception
     20 

Exception: 

The above exception was the direct cause of the following exception:

ValueError                                Traceback (most recent call last)
<ipython-input-1-68fa8848d30b> in <module>
     27         read_data(n)
     28     except Exception as e:
---> 29         raise ValueError(
     30             f"Iteration {n} exceeded {timeout} seconds."
     31         ) from e

ValueError: Iteration 27 exceeded 60 seconds.

cisaacstern avatar Sep 21 '21 19:09 cisaacstern

Ok this is great progress. Let's keep boiling it down. Here are some questions.

  • You're still looping though a bunch of files. Can we reproduce the problem with a single, specific file?
    • If YES:
      • Is there a problem with the file itself? Is this error related to corrupted data? A big problem with our caching strategy is that we don't use checksums for the inputs (most data providers don't provide them unfortunately), so this is hard to verify. Ideally we would go back upstream to the original download location and re-download the file, calculate the md5 checksum, etc.
      • If the file is not corrupted, is the error intermittent? Can we open / read / close the same file over and over and reproduce the same problem?
    • If NO:
      • What is the minimum number of files required to reproduce the problem?
      • Can we copy the same file to different filenames and reproduce the problem?
  • Is the error related to the fsspec implementation we are using? We can try this by putting the same problematic file[s] (identified in the previous step) in multiple locations. Right now you are using gcsfs. Let's add
    • HTTP. Place the file[s] in a public GCS bucket and access them via the http endpoint. Open with the HTTPFileSystem.
    • S3. Move the files to s3 and try with s3fs.
    • Regular files: put them on a local disk and use LocalFileSystem.

Ultimately the goal of this will be to raise an issue either in h5py or one of the fsspec repos. The simpler and shorter we can make the example, the more likely we are to get traction there. At minimum, a reproducer that uses public data is a must.

rabernat avatar Sep 21 '21 20:09 rabernat

Also, one final idea: can we try this with an fsspec / gcsfs version that predates to the big async refactor? That would help us narrow down whether this is async related.

rabernat avatar Sep 21 '21 20:09 rabernat

That would help us narrow down whether this is async related.

Worth checking, certainly, but I think we're pretty sure it is async related. Note, that I don't think we've yet tried calling fsspec.asyn._dump_running_tasks at the time of a hang (via KeyboardInterrup, for example) to see what is hung.

martindurant avatar Sep 21 '21 20:09 martindurant

I think we're pretty sure it is async related.

Is there a way to completely disable the async layer of fsspec implementations and just use purely synchronous functions?

rabernat avatar Sep 21 '21 20:09 rabernat

Is there a way to completely disable the async layer of fsspec

Absolutely not - they call libraries that are implemented in pure-async.

Two other thing that may be worth trying for the sake of elimination:

  • running with dask's "sync" scheduler
  • running without dask but using ordinary threads

martindurant avatar Sep 21 '21 20:09 martindurant

Before trying more library combinations, I recommend simplifying the reproducer further as outlined in https://github.com/pangeo-forge/pangeo-forge-recipes/issues/177#issuecomment-924354396.

rabernat avatar Sep 21 '21 20:09 rabernat

Before trying more library combinations, I recommend simplifying the reproducer further as outlined in #177 (comment).

...now questioning if my use of signal in https://github.com/pangeo-forge/pangeo-forge-recipes/issues/177#issuecomment-924337789 may be incorrectly setting a 60 second timeout on the whole loop, rather than a per-iteration timeout. I think this is probably what's happened, but still relatively confident that the general approach outlined there will reproduce the error. I'll now:

  • [x] correct the signal usage mistake to set a per-iteration timout, then ensure the multi-file h5py example does indeed hang as expected
  • [x] simplify the reproducer as proposed
  • [ ] check the simplified reproducer against the various solutions/debugging pathways proposed above

cisaacstern avatar Sep 21 '21 21:09 cisaacstern

Thanks for continuing to push on this Charles. I think we need to get to the bottom fo the issue with a minimal reproducer.

In the meantime, I plan to work this week on implementing the "reference" reading approach.

rabernat avatar Sep 27 '21 14:09 rabernat

I think we need to get to the bottom fo the issue with a minimal reproducer.

Sounds good. When I left off with it last week, I was beginning to doubt that it's reproducible with h5py alone (contrary to my comments above), and was starting to think it somehow connected to xarray being in the call stack. That will require further investigation to prove, though. I'll keep pushing on it.

cisaacstern avatar Sep 27 '21 14:09 cisaacstern

Some more debugging information, albeit no solutions (yet). High-level observations first, followed by a link to a reproducer Gist with much more detail. First off, I was mistaken when, earlier in this thread, I said that this issue was reproducible with h5py. That earlier misconception was due to an error in my reproducer code, and after fixing that mistake, I can't get the hang with h5py directly. It seems having xarray in the mix is relevant... which means there may be a possible fix in xarray itself?

To the specific points raised in https://github.com/pangeo-forge/pangeo-forge-recipes/issues/177#issuecomment-924354396:

Can we reproduce the problem with a single, specific file?

Yes, with the (big) caveat that which specific file this is appears to change from day-to-day. Typically, on a given day, a specific file in the GCS cache will cause consistent hangs on opening. But the next day it is likely to be a different file (or files).

Generally the file will not hang on a one-off attempt to open it. It will typically hang when called as part of an opening loop and preceded by other files, or alternatively, just preceded in the opening loop by a prior attempt to open itself.

Is there a problem with the file itself? Is this error related to corrupted data?

No. The file itself is not corrupted.

If the file is not corrupted, is the error intermittent? Can we open / read / close the same file over and over and reproduce the same problem?

Yes, this is definitely intermittent. Regarding both which file it is (changes daily), and which iterative attempt to open that specific file will cause the hang.

Is the error related to the fsspec implementation we are using?

Yes. So far I have tested GCSFileSystem and LocalFileSystem. The problem is clearly reproducible with the former but not the latter. I have not tried S3 or HTTP yet.

This gist documents the above in greater detail:

https://gist.github.com/cisaacstern/6b6cdd4e29024f6cff3d4e3b8fe0d9db

It concludes with a call to fsspec.asyn._dump_running_tasks, as suggested by Martin. I rewrote that function as follows, however, to make it easier to pretty-print the output as part of an error message:

https://github.com/cisaacstern/filesystem_spec/compare/master..dump-running-tasks

cisaacstern avatar Oct 01 '21 01:10 cisaacstern

So there are two tasks apparent waiting, the one is GSC's call, which is in fsspec.asyn._runner, waiting at line 25

        result[0] = await coro

and the other is, apparently aiohttp doing a DNS lookup. It is not surprising that the fsspec call should be waiting on something in aiohttp, but I would have expected some intermediate tasks too. I can find one report of _resolve_host() being associated with a race and coroutine cancelling, but not hanging.

Another mystery is that results contains more than None (it is passed in as [None] in sync()), although we're stuck on the only line which could have changed the value - once it completed (but it is still pending). CIMultiDict is an aiohttp object.

Have multiple runs resulted in similar traces?

martindurant avatar Oct 01 '21 02:10 martindurant

Have multiple runs resulted in similar traces?

I've gathered further details from 50 individual hangs ("hang occurrences"). This is the notebook used to create them, the last cell of which prints just the final line of the traceback for each hanging task in each of the 50 hang occurrences. The full output of fsspec.asyn._dump_running_tasks (which includes a full traceback, plus other information) for 50 hang occurrences would've been inconvenient to read in the notebook, so I dumped those details to this text file. Note that I made a few further tweaks to fsspec.asyn._dump_running_tasks this time around; this is my current version used to produce these results (further explanation of these tweaks below).

I would have expected some intermediate tasks too

Based on the summary printed from the last cell of the notebook, it appears that roughly 60% of the time, the GCS call from fsspec.asyn._runner (waiting at line 25) is the only hanging task. The other ~40% of the time, that task is waiting along with one other task. There appear to never be more than two tasks waiting.

there are two tasks apparent waiting, the one is GSC's call, ... the other is, apparently aiohttp doing a DNS lookup.

Based on the present results, in the ~40% of time when there is second task waiting (in addition to the GCS call), it is one of the following:

  • /aiohttp/client_reqrep.py, line 572, in write_bytes async def write_bytes(
  • /aiohttp/client_reqrep.py, line 591, in write_bytes await writer.write(chunk) # type: ignore
  • /aiohttp/connector.py, line 865, in _resolve_host addrs = await self._resolver.resolve(host, port, family=self._family)

The third of these, the DNS lookup task Martin noted in https://github.com/pangeo-forge/pangeo-forge-recipes/issues/177#issuecomment-931838190, is actually the least common of the three, occurring only once during this 50-occurrence test.

The list comprehension which serves as input for fsspec.asyn._dump_running_tasks apparently filters out completed async tasks, however in a notable percentage of the supposedly 2-task cases here, the task which is not the GCS call is, in fact, already completed (and therefore has no traceback). This is why my current version of fsspec.asyn._dump_running_tasks only generates a full output dictionary if task._coro.cr_frame is not None.

Another mystery is that results contains more than None

This was my mistake. On some early tests of this code, the value of result was returned as a 2-tuple containing a CIMultiDict plus a long byte string, the latter of which nearly crashed the notebook when printed to stdout. I then incorrectly assumed this would always be the return type, and defaulted result to a 2-tuple containing the strings ("CIMultiDictProxy(...)", "Byte string redacted here.") to spare stdout. This turns out to have been an incorrect assumption. Note that my current version of _dump_running_tasks corrects this error by printing just a type for whatever the result value is, and for the present 50 occurrence test, it was always None; for example here.

cisaacstern avatar Oct 01 '21 23:10 cisaacstern

for the present 50 occurrence test, it was always None

I tried to follow that path a little - glad it's not the case, as I was really puzzled!

martindurant avatar Oct 02 '21 01:10 martindurant

xref https://github.com/pangeo-forge/staged-recipes/pull/68#issuecomment-897866695 via @sharkinsspatial for a possibly related issue with writing (not reading)

cisaacstern avatar Oct 08 '21 17:10 cisaacstern

Noting that some earlier attempts at manually calling garbage collection appeared to not work, but now that we have the reproducers from https://github.com/pangeo-forge/pangeo-forge-recipes/issues/177#issuecomment-932633607 and https://github.com/pangeo-forge/pangeo-forge-recipes/issues/177#issuecomment-931813029, it may be work revisiting that approach.

See https://github.com/dask/dask/issues/7547#issuecomment-906939846 (@martindurant has referenced this previously, and I was reminded of it today by @sharkinsspatial)

xref https://github.com/pangeo-forge/pangeo-forge-recipes/issues/177#issuecomment-924021336

cisaacstern avatar Oct 08 '21 17:10 cisaacstern

@rabernat did you intend to close this via #218?

cisaacstern avatar Oct 15 '21 20:10 cisaacstern

Not really no. It's more of a hope that #218 will close this.

rabernat avatar Oct 15 '21 20:10 rabernat