xarray
xarray copied to clipboard
pandas.errors.InvalidIndexError raised when running computation in parallel using dask
What happened?
I'm doing a computation using chunks and map_blocks()
to run things in parallel. At some point a pandas.errors.InvalidIndexError
is raised. When using dask's synchronous scheduler, everything works fine. I think pandas.core.indexes.base.Index
is not thread-safe. At least this seems to be the place of the race condition. See further tests below.
(This issue was initially discussed in #6816, but the ticket was closed, because I couldn't reproduce the problem any longer. Now it seems to be reproducible in every run, so it is time for a proper bug report, which is this ticket here.)
What did you expect to happen?
Dask schedulers single-threaded
and threads
should have the same result.
Minimal Complete Verifiable Example 1
Edit: I've managed to reduce the verifiable example, see example 2 below.
# I wasn't able to reproduce the issue with a smaller code example, so I provide all my code and my test data. This should make it possible to reproduce the issue in less than a minute.
# Requirements:
# - git
# - mamba, see https://github.com/mamba-org/mamba
git clone https://github.com/lumbric/reproduce_invalidindexerror.git
cd reproduce_invalidindexerror
mamba env create -f env.yml
# alternatively run the following, will install latest versions from conda-forge:
# conda create -n reproduce_invalidindexerror
# conda activate reproduce_invalidindexerror
# mamba install -c conda-forge python=3.8 matplotlib pytest-cov dask openpyxl pytest pip xarray netcdf4 jupyter pandas scipy flake8 dvc pre-commit pyarrow statsmodels rasterio scikit-learn pytest-watch pdbpp black seaborn
conda activate reproduce_invalidindexerror
dvc repro checks_simulation
Minimal Complete Verifiable Example 2
import numpy as np
import pandas as pd
import xarray as xr
from multiprocessing import Lock
from dask.diagnostics import ProgressBar
# Workaround for xarray#6816: Parallel execution causes often an InvalidIndexError
# https://github.com/pydata/xarray/issues/6816#issuecomment-1243864752
# import dask
# dask.config.set(scheduler="single-threaded")
def generate_netcdf_files():
fnames = [f"{i:02d}.nc" for i in range(21)]
for i, fname in enumerate(fnames):
xr.DataArray(
np.ones((3879, 48)),
dims=("locations", "time"),
coords={
"time": pd.date_range(f"{2000 + i}-01-01", periods=48, freq="D"),
"locations": np.arange(3879),
},
).to_netcdf(fname)
return fnames
def compute(locations, data):
def resample_annually(data):
return data.sortby("time").resample(time="1A", label="left", loffset="1D").mean(dim="time")
def worker(data):
locations_chunk = locations.sel(locations=data.locations)
out_raw = data * locations_chunk
out = resample_annually(out_raw)
return out
template = resample_annually(data)
out = xr.map_blocks(
lambda data: worker(data).compute().chunk({"time": None}),
data,
template=template,
)
return out
def main():
fnames = generate_netcdf_files()
locations = xr.DataArray(
np.ones(3879),
dims="locations",
coords={"locations": np.arange(3879)},
)
data = xr.open_mfdataset(
fnames,
combine="by_coords",
chunks={"locations": 4000, "time": None},
# suggested as solution in
# lock=Lock(),
).__xarray_dataarray_variable__
out = compute(locations, data)
with ProgressBar():
out = out.compute()
if __name__ == "__main__":
main()
MVCE confirmation
- [X] Minimal example — the example is as focused as reasonably possible to demonstrate the underlying issue in xarray.
- [x] Complete example — the example is self-contained, including all data and the text of any traceback.
- [x] Verifiable example — the example copy & pastes into an IPython prompt or Binder notebook, returning the result.
- [X] New issue — a search of GitHub Issues suggests this is not a duplicate.
Relevant log output
This is the traceback of "Minimal Complete Verifiable Example 1".
Traceback (most recent call last):
File "scripts/calc_p_out_model.py", line 61, in <module>
main()
File "scripts/calc_p_out_model.py", line 31, in main
calc_power(name="p_out_model", compute_func=compute_func)
File "/tmp/reproduce_invalidindexerror/src/wind_power.py", line 136, in calc_power
power = power.compute()
File "/opt/miniconda3/envs/reproduce_invalidindexerror/lib/python3.8/site-packages/xarray/core/dataarray.py", line 993, in compute
return new.load(**kwargs)
File "/opt/miniconda3/envs/reproduce_invalidindexerror/lib/python3.8/site-packages/xarray/core/dataarray.py", line 967, in load
ds = self._to_temp_dataset().load(**kwargs)
File "/opt/miniconda3/envs/reproduce_invalidindexerror/lib/python3.8/site-packages/xarray/core/dataset.py", line 733, in load
evaluated_data = da.compute(*lazy_data.values(), **kwargs)
File "/opt/miniconda3/envs/reproduce_invalidindexerror/lib/python3.8/site-packages/dask/base.py", line 600, in compute
results = schedule(dsk, keys, **kwargs)
File "/opt/miniconda3/envs/reproduce_invalidindexerror/lib/python3.8/site-packages/dask/threaded.py", line 89, in get
results = get_async(
File "/opt/miniconda3/envs/reproduce_invalidindexerror/lib/python3.8/site-packages/dask/local.py", line 511, in get_async
raise_exception(exc, tb)
File "/opt/miniconda3/envs/reproduce_invalidindexerror/lib/python3.8/site-packages/dask/local.py", line 319, in reraise
raise exc
File "/opt/miniconda3/envs/reproduce_invalidindexerror/lib/python3.8/site-packages/dask/local.py", line 224, in execute_task
result = _execute_task(task, data)
File "/opt/miniconda3/envs/reproduce_invalidindexerror/lib/python3.8/site-packages/dask/core.py", line 119, in _execute_task
return func(*(_execute_task(a, cache) for a in args))
File "/opt/miniconda3/envs/reproduce_invalidindexerror/lib/python3.8/site-packages/dask/core.py", line 119, in <genexpr>
return func(*(_execute_task(a, cache) for a in args))
File "/opt/miniconda3/envs/reproduce_invalidindexerror/lib/python3.8/site-packages/dask/core.py", line 119, in _execute_task
return func(*(_execute_task(a, cache) for a in args))
File "/opt/miniconda3/envs/reproduce_invalidindexerror/lib/python3.8/site-packages/xarray/core/parallel.py", line 285, in _wrapper
result = func(*converted_args, **kwargs)
File "/tmp/reproduce_invalidindexerror/src/wind_power.py", line 100, in <lambda>
lambda wind_speeds: worker(wind_speeds).compute().chunk({"time": None}),
File "/tmp/reproduce_invalidindexerror/src/wind_power.py", line 50, in worker
specific_power_chunk = specific_power.sel(turbines=wind_speeds.turbines)
File "/opt/miniconda3/envs/reproduce_invalidindexerror/lib/python3.8/site-packages/xarray/core/dataarray.py", line 1420, in sel
ds = self._to_temp_dataset().sel(
File "/opt/miniconda3/envs/reproduce_invalidindexerror/lib/python3.8/site-packages/xarray/core/dataset.py", line 2533, in sel
query_results = map_index_queries(
File "/opt/miniconda3/envs/reproduce_invalidindexerror/lib/python3.8/site-packages/xarray/core/indexing.py", line 183, in map_index_queries
results.append(index.sel(labels, **options)) # type: ignore[call-arg]
File "/opt/miniconda3/envs/reproduce_invalidindexerror/lib/python3.8/site-packages/xarray/core/indexes.py", line 418, in sel
indexer = get_indexer_nd(self.index, label_array, method, tolerance)
File "/opt/miniconda3/envs/reproduce_invalidindexerror/lib/python3.8/site-packages/xarray/core/indexes.py", line 212, in get_indexer_nd
flat_indexer = index.get_indexer(flat_labels, method=method, tolerance=tolerance)
File "/opt/miniconda3/envs/reproduce_invalidindexerror/lib/python3.8/site-packages/pandas/core/indexes/base.py", line 3729, in get_indexer
raise InvalidIndexError(self._requires_unique_msg)
pandas.errors.InvalidIndexError: Reindexing only valid with uniquely valued Index objects
Anything else we need to know?
Workaround: Use synchronous dask scheduler
The issue does not occur if I use the synchronous dask scheduler by adding at the very beginning of my script:
dask.config.set(scheduler='single-threaded')
Additional debugging print
If I add the following debugging print to the pandas code:
--- /tmp/base.py 2022-09-12 16:35:53.739971953 +0200
+++ /opt/miniconda3/envs/reproduce_invalidindexerror/lib/python3.8/site-packages/pandas/core/indexes/base.py 2022-09-12 16:35:58.864144801 +0200
@@ -3718,7 +3718,6 @@
self._check_indexing_method(method, limit, tolerance)
if not self._index_as_unique:
+ print("Original: ", len(self), ", length of set:", len(set(self)))
raise InvalidIndexError(self._requires_unique_msg)
if len(target) == 0
...I get the following output:
Original: 3879 , length of set: 3879
So the index seems to be unique, but self.is_unique
is False
for some reason (note that not self._index_as_unique
and self.is_unique
is the same in this case).
Proof of race condtion: addd sleep 1s
To confirm that the race condition is at this point we wait for 1s and then check again for uniqueness:
--- /tmp/base.py 2022-09-12 16:35:53.739971953 +0200
+++ /opt/miniconda3/envs/reproduce_invalidindexerror/lib/python3.8/site-packages/pandas/core/indexes/base.py 2022-09-12 16:35:58.864144801 +0200
@@ -3718,7 +3718,10 @@
self._check_indexing_method(method, limit, tolerance)
if not self._index_as_unique:
+ if not self.is_unique:
+ import time
+ time.sleep(1)
+ print("now unique?", self.is_unique)
raise InvalidIndexError(self._requires_unique_msg)
This outputs:
now unique? True
Environment
xarray: 0.15.0 pandas: 0.25.3 numpy: 1.17.4 scipy: 1.3.3 netCDF4: 1.5.3 pydap: None h5netcdf: 0.7.1 h5py: 2.10.0 Nio: None zarr: 2.4.0+ds cftime: 1.1.0 nc_time_axis: None PseudoNetCDF: None rasterio: 1.1.3 cfgrib: None iris: None bottleneck: 1.2.1 dask: 2.8.1+dfsg distributed: None matplotlib: 3.1.2 cartopy: None seaborn: 0.10.0 numbagg: None setuptools: 45.2.0 pip3: None conda: None pytest: 4.6.9 IPython: 7.13.0 sphinx: 1.8.5
xref #6904 (not sure it is 100% relevant, but the problem also arises when .sel
is called within some parallel workflow).
@benbovy thanks for the hint! I tried passing an explicit lock to xr.open_mfdataset()
as suggested, but didn't change anything, still the same exception. I will double check, if I did it the right way, I might be missing something.
I have managed to reduce the reproducing example (see "Minimal Complete Verifiable Example 2" above) and then also find a proper solution to fix this issue. I am still not sure whether this is a bug or intended behavior, so I'll won't close the issue for now.
Basically the issue occurs when a chunked NetCDF file is loaded from disk, passed to xarray.map_blocks()
and is then used in .sel()
as parameter to get a subset of some other xarray object which is not passed to the worker func()
. I think the proper solution is to use the args
parameter of map_blocks()
instead of .sel()
:
--- run-broken.py 2022-09-22 13:00:41.095555961 +0200
+++ run.py 2022-09-22 13:01:14.452696511 +0200
@@ -30,17 +30,17 @@
def resample_annually(data):
return data.sortby("time").resample(time="1A", label="left", loffset="1D").mean(dim="time")
- def worker(data):
- locations_chunk = locations.sel(locations=data.locations)
- out_raw = data * locations_chunk
+ def worker(data, locations):
+ out_raw = data * locations
out = resample_annually(out_raw)
return out
template = resample_annually(data)
out = xr.map_blocks(
- lambda data: worker(data).compute().chunk({"time": None}),
+ lambda data, locations: worker(data, locations).compute().chunk({"time": None}),
data,
+ (locations,),
template=template,
)
This seems to fix this issue and seems to be the proper solution anyway.
I still don't see why I am not allowed to use .sel()
on shadowed objects in the worker func()´. Is this on purpose? If yes, should we add something to the documentation? Is this a specific behavior of
map_blocks()`? Is it related to #6904?
I've had the same issues under the exact same conditions. However, it happens whether I use dask or not.
This solution fixes it, but I agree at least a doc update would be helpful!
I agree with just passing all args explicitly. Does it work otherwise with "processes"
?
As a side note, this particular example seems quite convoluted.
- Why are you chunking iniside the mapped function?
- If you
conda install flox
, the resample operation should be quite efficient, without the need to usemap_blocks
I agree with just passing all args explicitly. Does it work otherwise with
"processes"
?
What do you mean by that?
- Why are you chunking iniside the mapped function?
Uhm yes, you are right, this should be removed, not sure how this happened. Removing .chunk({"time": None})
in the lambda function does not change the behavior of the example regarding this issue.
- If you
conda install flox
, the resample operation should be quite efficient, without the need to usemap_blocks
Oh wow, thanks! Haven't seen flox before.
Thanks for raising. I too am intermittently hitting this pandas.errors.InvalidIndexError
using xr.Dataset.map_blocks
and compute with the default (threads) scheduler. The error does not occur with either "synchronous" or "processes" scheduling. It also does not occur when I throw a call to print
in the function handed to map_blocks
(so ... race condition?).
Using scheduler="processes"
as a solution for now, and will try to come back with another MCVE when time allows.
@itcarroll did you see my comment above how I fixed my issue? Are you using shadowed variables in your worker function?