xarray icon indicating copy to clipboard operation
xarray copied to clipboard

pandas.errors.InvalidIndexError is raised in some runs when using chunks and map_blocks()

Open lumbric opened this issue 2 years ago • 1 comments

What is your issue?

I'm doing a lengthy computation, which involves hundreds of GB of data using chunks and map_blocks() so that things fit into RAM and can be done in parallel. From time to time, the following error is raised:

pandas.errors.InvalidIndexError: Reindexing only valid with uniquely valued Index objects

The line where this takes place looks pretty harmless:

x = a * b.sel(c=d.c)

It's a line inside the function func which is passed to a map_blocks() call. In this case a and b are xr.DataArray or xr.DataSet objects shadowed from outer scope and d is the parameter obj for map_blocks().

That means, the line below in the traceback looks like this:

xr.map_blocks(
    lambda d: worker(d).compute().chunk({"time": None}),
    d,
    template=template)

I guess it's some kind of race condition, since it's not 100% reproducible, but I have no idea how to further investigate the issue to create a proper bug report or fix my code.

Do you have any hint how I could continue building a minimal example or so in such a case? What does the error message want to tell me?

lumbric avatar Jul 22 '22 14:07 lumbric

Hi @lumbric - thanks for the report. Though its often challenging when coming from complicated workflows, getting to a MCVE is really the only way to sort out what is going wrong here.

A few suggestions that will hopefully help you get there:

  • Try running your calculation using the synchronous dask scheduler. Dask's graph execution order is not always deterministic so sometimes leads to the conclusion that errors are not reproducible.
  • Try reconstructing a similar workflow with dummy data.

FWIW, the error you are getting tells me that you have an Index/Coordinate with duplicate values in it. Are you sure that is not possible?

jhamman avatar Jul 22 '22 15:07 jhamman

Thanks a lot for your quick reply and your helpful hints!

In the meantime I have verified that: d.c is unique, i.e. np.unqiue(d.c).size == d.c.size

Unfortunately I was not able to reproduce the error often enough lately to test it with the synchronous scheduler nor to create a smaller synthetic example which reproduces the problem. One run takes about an hour until the exception occurs (or not), which makes things hard to debug. But I will continue trying and keep this ticket updated.

Any further suggestions very welcome :) Thanks a lot!

lumbric avatar Aug 19 '22 10:08 lumbric

Thanks, please reopen if you can construct a reproducible example.

dcherian avatar Aug 19 '22 14:08 dcherian

Not sure what changed, but now I do get the same error also with my small and synthetic test data. This way I was able to debug a bit further. I am pretty sure this is a bug in xarray or pandas.

I think something in pandas.core.indexes.base.Index is not thread-safe. At least this seems to be the place of the race condition.

I can create a new ticket, if you prefer, but since I am not sure in which project, I will continue to collect information here. Unfortunately I have not yet managed to create a minimal example as this is quite tricky with timing issues.

Additional debugging print and proof of race condition

If I add the following debugging print to the pandas code:

--- /tmp/base.py        2022-09-12 16:35:53.739971953 +0200
+++ /home/lumbric/.conda/envs/my_project/lib/python3.8/site-packages/pandas/core/indexes/base.py      2022-09-12 16:35:58.864144801 +0200
@@ -3718,7 +3718,6 @@
         self._check_indexing_method(method, limit, tolerance)
 
         if not self._index_as_unique:
+            print("Original: ", len(self), ", length of set:", len(set(self)))
             raise InvalidIndexError(self._requires_unique_msg)
 
         if len(target) == 0

...I get the following output:

Original:  3879 , length of set: 3879

So the index seems to be unique, but self.is_unique is False for some reason (note that not self._index_as_unique and self.is_unique is the same in this case).

To confirm that the race condition is at this point we wait for 1s and then check again for uniqueness:

--- /tmp/base.py        2022-09-12 16:35:53.739971953 +0200
+++ /home/lumbric/.conda/envs/my_project/lib/python3.8/site-packages/pandas/core/indexes/base.py      2022-09-12 16:35:58.864144801 +0200
@@ -3718,7 +3718,10 @@
         self._check_indexing_method(method, limit, tolerance)
 
         if not self._index_as_unique:
+            if not self.is_unique:
+                import time
+                time.sleep(1)
+                print("now unique?", self.is_unique)
             raise InvalidIndexError(self._requires_unique_msg)

This outputs:

now unique? True

Traceback

Traceback (most recent call last):
  File "scripts/my_script.py", line 57, in <module>
    main()
  File "scripts/my_script.py", line 48, in main
    my_function(
  File "/home/lumbric/my_project/src/calculations.py", line 136, in my_function
    result = result.compute()
  File "/home/lumbric/.conda/envs/my_project/lib/python3.8/site-packages/xarray/core/dataarray.py", line 947, in compute
    return new.load(**kwargs)
  File "/home/lumbric/.conda/envs/my_project/lib/python3.8/site-packages/xarray/core/dataarray.py", line 921, in load
    ds = self._to_temp_dataset().load(**kwargs)
  File "/home/lumbric/.conda/envs/my_project/lib/python3.8/site-packages/xarray/core/dataset.py", line 861, in load
    evaluated_data = da.compute(*lazy_data.values(), **kwargs)
  File "/home/lumbric/.conda/envs/my_project/lib/python3.8/site-packages/dask/base.py", line 600, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/home/lumbric/.conda/envs/my_project/lib/python3.8/site-packages/dask/threaded.py", line 81, in get
    results = get_async(
  File "/home/lumbric/.conda/envs/my_project/lib/python3.8/site-packages/dask/local.py", line 508, in get_async
    raise_exception(exc, tb)
  File "/home/lumbric/.conda/envs/my_project/lib/python3.8/site-packages/dask/local.py", line 316, in reraise
    raise exc
  File "/home/lumbric/.conda/envs/my_project/lib/python3.8/site-packages/dask/local.py", line 221, in execute_task
    result = _execute_task(task, data)
  File "/home/lumbric/.conda/envs/my_project/lib/python3.8/site-packages/dask/core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/home/lumbric/.conda/envs/my_project/lib/python3.8/site-packages/dask/core.py", line 119, in <genexpr>
    return func(*(_execute_task(a, cache) for a in args))
  File "/home/lumbric/.conda/envs/my_project/lib/python3.8/site-packages/dask/core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/home/lumbric/.conda/envs/my_project/lib/python3.8/site-packages/xarray/core/parallel.py", line 285, in _wrapper
    result = func(*converted_args, **kwargs)
  File "/home/lumbric/some_project/src/calculations.py", line 100, in <lambda>
    lambda input_data: worker(input_data).compute().chunk({"time": None}),
  File "/home/lumbric/some_project/src/calculations.py", line 69, in worker
    raise e
  File "/home/lumbric/some_project/src/calculations.py", line 60, in worker
    out = some_data * some_other_data.sel(some_dimension=input_data.some_dimension)
  File "/home/lumbric/.conda/envs/my_project/lib/python3.8/site-packages/xarray/core/dataarray.py", line 1329, in sel
    ds = self._to_temp_dataset().sel(
  File "/home/lumbric/.conda/envs/my_project/lib/python3.8/site-packages/xarray/core/dataset.py", line 2502, in sel
    pos_indexers, new_indexes = remap_label_indexers(
  File "/home/lumbric/.conda/envs/my_project/lib/python3.8/site-packages/xarray/core/coordinates.py", line 421, in remap_label_indexers
    pos_indexers, new_indexes = indexing.remap_label_indexers(
  File "/home/lumbric/.conda/envs/my_project/lib/python3.8/site-packages/xarray/core/indexing.py", line 121, in remap_label_indexers
    idxr, new_idx = index.query(labels, method=method, tolerance=tolerance)
  File "/home/lumbric/.conda/envs/my_project/lib/python3.8/site-packages/xarray/core/indexes.py", line 245, in query
    indexer = get_indexer_nd(self.index, label, method, tolerance)
  File "/home/lumbric/.conda/envs/my_project/lib/python3.8/site-packages/xarray/core/indexes.py", line 142, in get_indexer_nd
    flat_indexer = index.get_indexer(flat_labels, method=method, tolerance=tolerance)
  File "/home/lumbric/.conda/envs/my_project/lib/python3.8/site-packages/pandas/core/indexes/base.py", line 3722, in get_indexer
    raise InvalidIndexError(self._requires_unique_msg)
pandas.errors.InvalidIndexError: Reindexing only valid with uniquely valued Index objects

Workaround

The issue does not occur if I use the synchronous dask scheduler by adding at the very beginning of my script:

dask.config.set(scheduler='single-threaded')

Environment

INSTALLED VERSIONS ------------------ commit: None python: 3.8.13 | packaged by conda-forge | (default, Mar 25 2022, 06:04:10) [GCC 10.3.0] python-bits: 64 OS: Linux OS-release: 5.4.0-124-generic machine: x86_64 processor: x86_64 byteorder: little LC_ALL: None LANG: en_US.UTF-8 LOCALE: ('en_US', 'UTF-8') libhdf5: 1.12.1 libnetcdf: 4.8.1

xarray: 2022.3.0 pandas: 1.4.2 numpy: 1.22.4 scipy: 1.8.1 netCDF4: 1.5.8 pydap: None h5netcdf: None h5py: None Nio: None zarr: None cftime: 1.6.0 nc_time_axis: None PseudoNetCDF: None rasterio: 1.2.10 cfgrib: None iris: None bottleneck: None dask: 2022.05.2 distributed: 2022.5.2 matplotlib: 3.5.2 cartopy: None seaborn: 0.11.2 numbagg: None fsspec: 2022.5.0 cupy: None pint: None sparse: None setuptools: 62.3.2 pip: 22.1.2 conda: 4.12.0 pytest: 7.1.2 IPython: 8.4.0 sphinx: None

lumbric avatar Sep 12 '22 14:09 lumbric

I think these are the values of the index, the values seem to be unique and monotonic.

lumbric avatar Sep 12 '22 15:09 lumbric