xarray
xarray copied to clipboard
pandas.errors.InvalidIndexError is raised in some runs when using chunks and map_blocks()
What is your issue?
I'm doing a lengthy computation, which involves hundreds of GB of data using chunks and map_blocks() so that things fit into RAM and can be done in parallel. From time to time, the following error is raised:
pandas.errors.InvalidIndexError: Reindexing only valid with uniquely valued Index objects
The line where this takes place looks pretty harmless:
x = a * b.sel(c=d.c)
It's a line inside the function func
which is passed to a map_blocks()
call. In this case a
and b
are xr.DataArray
or xr.DataSet
objects shadowed from outer scope and d
is the parameter obj
for map_blocks()
.
That means, the line below in the traceback looks like this:
xr.map_blocks(
lambda d: worker(d).compute().chunk({"time": None}),
d,
template=template)
I guess it's some kind of race condition, since it's not 100% reproducible, but I have no idea how to further investigate the issue to create a proper bug report or fix my code.
Do you have any hint how I could continue building a minimal example or so in such a case? What does the error message want to tell me?
Hi @lumbric - thanks for the report. Though its often challenging when coming from complicated workflows, getting to a MCVE is really the only way to sort out what is going wrong here.
A few suggestions that will hopefully help you get there:
- Try running your calculation using the
synchronous
dask scheduler. Dask's graph execution order is not always deterministic so sometimes leads to the conclusion that errors are not reproducible. - Try reconstructing a similar workflow with dummy data.
FWIW, the error you are getting tells me that you have an Index/Coordinate with duplicate values in it. Are you sure that is not possible?
Thanks a lot for your quick reply and your helpful hints!
In the meantime I have verified that: d.c
is unique, i.e. np.unqiue(d.c).size == d.c.size
Unfortunately I was not able to reproduce the error often enough lately to test it with the synchronous scheduler nor to create a smaller synthetic example which reproduces the problem. One run takes about an hour until the exception occurs (or not), which makes things hard to debug. But I will continue trying and keep this ticket updated.
Any further suggestions very welcome :) Thanks a lot!
Thanks, please reopen if you can construct a reproducible example.
Not sure what changed, but now I do get the same error also with my small and synthetic test data. This way I was able to debug a bit further. I am pretty sure this is a bug in xarray or pandas.
I think something in pandas.core.indexes.base.Index
is not thread-safe. At least this seems to be the place of the race condition.
I can create a new ticket, if you prefer, but since I am not sure in which project, I will continue to collect information here. Unfortunately I have not yet managed to create a minimal example as this is quite tricky with timing issues.
Additional debugging print and proof of race condition
If I add the following debugging print to the pandas code:
--- /tmp/base.py 2022-09-12 16:35:53.739971953 +0200
+++ /home/lumbric/.conda/envs/my_project/lib/python3.8/site-packages/pandas/core/indexes/base.py 2022-09-12 16:35:58.864144801 +0200
@@ -3718,7 +3718,6 @@
self._check_indexing_method(method, limit, tolerance)
if not self._index_as_unique:
+ print("Original: ", len(self), ", length of set:", len(set(self)))
raise InvalidIndexError(self._requires_unique_msg)
if len(target) == 0
...I get the following output:
Original: 3879 , length of set: 3879
So the index seems to be unique, but self.is_unique
is False
for some reason (note that not self._index_as_unique
and self.is_unique
is the same in this case).
To confirm that the race condition is at this point we wait for 1s and then check again for uniqueness:
--- /tmp/base.py 2022-09-12 16:35:53.739971953 +0200
+++ /home/lumbric/.conda/envs/my_project/lib/python3.8/site-packages/pandas/core/indexes/base.py 2022-09-12 16:35:58.864144801 +0200
@@ -3718,7 +3718,10 @@
self._check_indexing_method(method, limit, tolerance)
if not self._index_as_unique:
+ if not self.is_unique:
+ import time
+ time.sleep(1)
+ print("now unique?", self.is_unique)
raise InvalidIndexError(self._requires_unique_msg)
This outputs:
now unique? True
Traceback
Traceback (most recent call last):
File "scripts/my_script.py", line 57, in <module>
main()
File "scripts/my_script.py", line 48, in main
my_function(
File "/home/lumbric/my_project/src/calculations.py", line 136, in my_function
result = result.compute()
File "/home/lumbric/.conda/envs/my_project/lib/python3.8/site-packages/xarray/core/dataarray.py", line 947, in compute
return new.load(**kwargs)
File "/home/lumbric/.conda/envs/my_project/lib/python3.8/site-packages/xarray/core/dataarray.py", line 921, in load
ds = self._to_temp_dataset().load(**kwargs)
File "/home/lumbric/.conda/envs/my_project/lib/python3.8/site-packages/xarray/core/dataset.py", line 861, in load
evaluated_data = da.compute(*lazy_data.values(), **kwargs)
File "/home/lumbric/.conda/envs/my_project/lib/python3.8/site-packages/dask/base.py", line 600, in compute
results = schedule(dsk, keys, **kwargs)
File "/home/lumbric/.conda/envs/my_project/lib/python3.8/site-packages/dask/threaded.py", line 81, in get
results = get_async(
File "/home/lumbric/.conda/envs/my_project/lib/python3.8/site-packages/dask/local.py", line 508, in get_async
raise_exception(exc, tb)
File "/home/lumbric/.conda/envs/my_project/lib/python3.8/site-packages/dask/local.py", line 316, in reraise
raise exc
File "/home/lumbric/.conda/envs/my_project/lib/python3.8/site-packages/dask/local.py", line 221, in execute_task
result = _execute_task(task, data)
File "/home/lumbric/.conda/envs/my_project/lib/python3.8/site-packages/dask/core.py", line 119, in _execute_task
return func(*(_execute_task(a, cache) for a in args))
File "/home/lumbric/.conda/envs/my_project/lib/python3.8/site-packages/dask/core.py", line 119, in <genexpr>
return func(*(_execute_task(a, cache) for a in args))
File "/home/lumbric/.conda/envs/my_project/lib/python3.8/site-packages/dask/core.py", line 119, in _execute_task
return func(*(_execute_task(a, cache) for a in args))
File "/home/lumbric/.conda/envs/my_project/lib/python3.8/site-packages/xarray/core/parallel.py", line 285, in _wrapper
result = func(*converted_args, **kwargs)
File "/home/lumbric/some_project/src/calculations.py", line 100, in <lambda>
lambda input_data: worker(input_data).compute().chunk({"time": None}),
File "/home/lumbric/some_project/src/calculations.py", line 69, in worker
raise e
File "/home/lumbric/some_project/src/calculations.py", line 60, in worker
out = some_data * some_other_data.sel(some_dimension=input_data.some_dimension)
File "/home/lumbric/.conda/envs/my_project/lib/python3.8/site-packages/xarray/core/dataarray.py", line 1329, in sel
ds = self._to_temp_dataset().sel(
File "/home/lumbric/.conda/envs/my_project/lib/python3.8/site-packages/xarray/core/dataset.py", line 2502, in sel
pos_indexers, new_indexes = remap_label_indexers(
File "/home/lumbric/.conda/envs/my_project/lib/python3.8/site-packages/xarray/core/coordinates.py", line 421, in remap_label_indexers
pos_indexers, new_indexes = indexing.remap_label_indexers(
File "/home/lumbric/.conda/envs/my_project/lib/python3.8/site-packages/xarray/core/indexing.py", line 121, in remap_label_indexers
idxr, new_idx = index.query(labels, method=method, tolerance=tolerance)
File "/home/lumbric/.conda/envs/my_project/lib/python3.8/site-packages/xarray/core/indexes.py", line 245, in query
indexer = get_indexer_nd(self.index, label, method, tolerance)
File "/home/lumbric/.conda/envs/my_project/lib/python3.8/site-packages/xarray/core/indexes.py", line 142, in get_indexer_nd
flat_indexer = index.get_indexer(flat_labels, method=method, tolerance=tolerance)
File "/home/lumbric/.conda/envs/my_project/lib/python3.8/site-packages/pandas/core/indexes/base.py", line 3722, in get_indexer
raise InvalidIndexError(self._requires_unique_msg)
pandas.errors.InvalidIndexError: Reindexing only valid with uniquely valued Index objects
Workaround
The issue does not occur if I use the synchronous dask scheduler by adding at the very beginning of my script:
dask.config.set(scheduler='single-threaded')
Environment
xarray: 2022.3.0 pandas: 1.4.2 numpy: 1.22.4 scipy: 1.8.1 netCDF4: 1.5.8 pydap: None h5netcdf: None h5py: None Nio: None zarr: None cftime: 1.6.0 nc_time_axis: None PseudoNetCDF: None rasterio: 1.2.10 cfgrib: None iris: None bottleneck: None dask: 2022.05.2 distributed: 2022.5.2 matplotlib: 3.5.2 cartopy: None seaborn: 0.11.2 numbagg: None fsspec: 2022.5.0 cupy: None pint: None sparse: None setuptools: 62.3.2 pip: 22.1.2 conda: 4.12.0 pytest: 7.1.2 IPython: 8.4.0 sphinx: None
I think these are the values of the index, the values seem to be unique and monotonic.