xarray icon indicating copy to clipboard operation
xarray copied to clipboard

pandas.errors.InvalidIndexError raised when running computation in parallel using dask

Open lumbric opened this issue 1 year ago • 8 comments

What happened?

I'm doing a computation using chunks and map_blocks() to run things in parallel. At some point a pandas.errors.InvalidIndexError is raised. When using dask's synchronous scheduler, everything works fine. I think pandas.core.indexes.base.Index is not thread-safe. At least this seems to be the place of the race condition. See further tests below.

(This issue was initially discussed in #6816, but the ticket was closed, because I couldn't reproduce the problem any longer. Now it seems to be reproducible in every run, so it is time for a proper bug report, which is this ticket here.)

What did you expect to happen?

Dask schedulers single-threaded and threads should have the same result.

Minimal Complete Verifiable Example 1

Edit: I've managed to reduce the verifiable example, see example 2 below.

# I wasn't able to reproduce the issue with a smaller code example, so I provide all my code and my test data. This should make it possible to reproduce the issue in less than a minute.

# Requirements:
#   - git
#   - mamba, see https://github.com/mamba-org/mamba

git clone https://github.com/lumbric/reproduce_invalidindexerror.git
cd reproduce_invalidindexerror

mamba env create -f env.yml

# alternatively run the following, will install latest versions from conda-forge:
# conda create -n reproduce_invalidindexerror
# conda activate reproduce_invalidindexerror
# mamba install -c conda-forge python=3.8 matplotlib pytest-cov dask openpyxl pytest pip xarray netcdf4 jupyter pandas scipy flake8 dvc pre-commit pyarrow statsmodels rasterio scikit-learn pytest-watch pdbpp black seaborn

conda activate reproduce_invalidindexerror

dvc repro checks_simulation

Minimal Complete Verifiable Example 2

import numpy as np
import pandas as pd
import xarray as xr

from multiprocessing import Lock
from dask.diagnostics import ProgressBar


# Workaround for xarray#6816: Parallel execution causes often an InvalidIndexError
# https://github.com/pydata/xarray/issues/6816#issuecomment-1243864752
# import dask
# dask.config.set(scheduler="single-threaded")


def generate_netcdf_files():
    fnames = [f"{i:02d}.nc" for i in range(21)]
    for i, fname in enumerate(fnames):
        xr.DataArray(
            np.ones((3879, 48)),
            dims=("locations", "time"),
            coords={
                "time": pd.date_range(f"{2000 + i}-01-01", periods=48, freq="D"),
                "locations": np.arange(3879),
            },
        ).to_netcdf(fname)
    return fnames


def compute(locations, data):
    def resample_annually(data):
        return data.sortby("time").resample(time="1A", label="left", loffset="1D").mean(dim="time")

    def worker(data):
        locations_chunk = locations.sel(locations=data.locations)
        out_raw = data * locations_chunk
        out = resample_annually(out_raw)
        return out

    template = resample_annually(data)

    out = xr.map_blocks(
        lambda data: worker(data).compute().chunk({"time": None}),
        data,
        template=template,
    )

    return out


def main():
    fnames = generate_netcdf_files()

    locations = xr.DataArray(
        np.ones(3879),
        dims="locations",
        coords={"locations": np.arange(3879)},
    )

    data = xr.open_mfdataset(
        fnames,
        combine="by_coords",
        chunks={"locations": 4000, "time": None},
        # suggested as solution in
        # lock=Lock(),
    ).__xarray_dataarray_variable__

    out = compute(locations, data)

    with ProgressBar():
        out = out.compute()


if __name__ == "__main__":
    main()

MVCE confirmation

  • [X] Minimal example — the example is as focused as reasonably possible to demonstrate the underlying issue in xarray.
  • [x] Complete example — the example is self-contained, including all data and the text of any traceback.
  • [x] Verifiable example — the example copy & pastes into an IPython prompt or Binder notebook, returning the result.
  • [X] New issue — a search of GitHub Issues suggests this is not a duplicate.

Relevant log output

This is the traceback of "Minimal Complete Verifiable Example 1".

Traceback (most recent call last):
  File "scripts/calc_p_out_model.py", line 61, in <module>
    main()
  File "scripts/calc_p_out_model.py", line 31, in main
    calc_power(name="p_out_model", compute_func=compute_func)
  File "/tmp/reproduce_invalidindexerror/src/wind_power.py", line 136, in calc_power
    power = power.compute()
  File "/opt/miniconda3/envs/reproduce_invalidindexerror/lib/python3.8/site-packages/xarray/core/dataarray.py", line 993, in compute
    return new.load(**kwargs)
  File "/opt/miniconda3/envs/reproduce_invalidindexerror/lib/python3.8/site-packages/xarray/core/dataarray.py", line 967, in load
    ds = self._to_temp_dataset().load(**kwargs)
  File "/opt/miniconda3/envs/reproduce_invalidindexerror/lib/python3.8/site-packages/xarray/core/dataset.py", line 733, in load
    evaluated_data = da.compute(*lazy_data.values(), **kwargs)
  File "/opt/miniconda3/envs/reproduce_invalidindexerror/lib/python3.8/site-packages/dask/base.py", line 600, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/opt/miniconda3/envs/reproduce_invalidindexerror/lib/python3.8/site-packages/dask/threaded.py", line 89, in get
    results = get_async(
  File "/opt/miniconda3/envs/reproduce_invalidindexerror/lib/python3.8/site-packages/dask/local.py", line 511, in get_async
    raise_exception(exc, tb)
  File "/opt/miniconda3/envs/reproduce_invalidindexerror/lib/python3.8/site-packages/dask/local.py", line 319, in reraise
    raise exc
  File "/opt/miniconda3/envs/reproduce_invalidindexerror/lib/python3.8/site-packages/dask/local.py", line 224, in execute_task
    result = _execute_task(task, data)
  File "/opt/miniconda3/envs/reproduce_invalidindexerror/lib/python3.8/site-packages/dask/core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/opt/miniconda3/envs/reproduce_invalidindexerror/lib/python3.8/site-packages/dask/core.py", line 119, in <genexpr>
    return func(*(_execute_task(a, cache) for a in args))
  File "/opt/miniconda3/envs/reproduce_invalidindexerror/lib/python3.8/site-packages/dask/core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/opt/miniconda3/envs/reproduce_invalidindexerror/lib/python3.8/site-packages/xarray/core/parallel.py", line 285, in _wrapper
    result = func(*converted_args, **kwargs)
  File "/tmp/reproduce_invalidindexerror/src/wind_power.py", line 100, in <lambda>
    lambda wind_speeds: worker(wind_speeds).compute().chunk({"time": None}),
  File "/tmp/reproduce_invalidindexerror/src/wind_power.py", line 50, in worker
    specific_power_chunk = specific_power.sel(turbines=wind_speeds.turbines)
  File "/opt/miniconda3/envs/reproduce_invalidindexerror/lib/python3.8/site-packages/xarray/core/dataarray.py", line 1420, in sel
    ds = self._to_temp_dataset().sel(
  File "/opt/miniconda3/envs/reproduce_invalidindexerror/lib/python3.8/site-packages/xarray/core/dataset.py", line 2533, in sel
    query_results = map_index_queries(
  File "/opt/miniconda3/envs/reproduce_invalidindexerror/lib/python3.8/site-packages/xarray/core/indexing.py", line 183, in map_index_queries
    results.append(index.sel(labels, **options))  # type: ignore[call-arg]
  File "/opt/miniconda3/envs/reproduce_invalidindexerror/lib/python3.8/site-packages/xarray/core/indexes.py", line 418, in sel
    indexer = get_indexer_nd(self.index, label_array, method, tolerance)
  File "/opt/miniconda3/envs/reproduce_invalidindexerror/lib/python3.8/site-packages/xarray/core/indexes.py", line 212, in get_indexer_nd
    flat_indexer = index.get_indexer(flat_labels, method=method, tolerance=tolerance)
  File "/opt/miniconda3/envs/reproduce_invalidindexerror/lib/python3.8/site-packages/pandas/core/indexes/base.py", line 3729, in get_indexer
    raise InvalidIndexError(self._requires_unique_msg)
pandas.errors.InvalidIndexError: Reindexing only valid with uniquely valued Index objects

Anything else we need to know?

Workaround: Use synchronous dask scheduler

The issue does not occur if I use the synchronous dask scheduler by adding at the very beginning of my script:

dask.config.set(scheduler='single-threaded')

Additional debugging print

If I add the following debugging print to the pandas code:

--- /tmp/base.py        2022-09-12 16:35:53.739971953 +0200
+++ /opt/miniconda3/envs/reproduce_invalidindexerror/lib/python3.8/site-packages/pandas/core/indexes/base.py      2022-09-12 16:35:58.864144801 +0200
@@ -3718,7 +3718,6 @@
         self._check_indexing_method(method, limit, tolerance)
 
         if not self._index_as_unique:
+            print("Original: ", len(self), ", length of set:", len(set(self)))
             raise InvalidIndexError(self._requires_unique_msg)
 
         if len(target) == 0

...I get the following output:

Original:  3879 , length of set: 3879

So the index seems to be unique, but self.is_unique is False for some reason (note that not self._index_as_unique and self.is_unique is the same in this case).

Proof of race condtion: addd sleep 1s

To confirm that the race condition is at this point we wait for 1s and then check again for uniqueness:

--- /tmp/base.py        2022-09-12 16:35:53.739971953 +0200
+++ /opt/miniconda3/envs/reproduce_invalidindexerror/lib/python3.8/site-packages/pandas/core/indexes/base.py      2022-09-12 16:35:58.864144801 +0200
@@ -3718,7 +3718,10 @@
         self._check_indexing_method(method, limit, tolerance)
 
         if not self._index_as_unique:
+            if not self.is_unique:
+                import time
+                time.sleep(1)
+                print("now unique?", self.is_unique)
             raise InvalidIndexError(self._requires_unique_msg)

This outputs:

now unique? True

Environment

INSTALLED VERSIONS ------------------ commit: None python: 3.8.10 (default, Jun 22 2022, 20:18:18) [GCC 9.4.0] python-bits: 64 OS: Linux OS-release: 5.4.0-125-generic machine: x86_64 processor: x86_64 byteorder: little LC_ALL: None LANG: en_US.UTF-8 LOCALE: en_US.UTF-8 libhdf5: 1.10.4 libnetcdf: 4.7.3

xarray: 0.15.0 pandas: 0.25.3 numpy: 1.17.4 scipy: 1.3.3 netCDF4: 1.5.3 pydap: None h5netcdf: 0.7.1 h5py: 2.10.0 Nio: None zarr: 2.4.0+ds cftime: 1.1.0 nc_time_axis: None PseudoNetCDF: None rasterio: 1.1.3 cfgrib: None iris: None bottleneck: 1.2.1 dask: 2.8.1+dfsg distributed: None matplotlib: 3.1.2 cartopy: None seaborn: 0.10.0 numbagg: None setuptools: 45.2.0 pip3: None conda: None pytest: 4.6.9 IPython: 7.13.0 sphinx: 1.8.5

lumbric avatar Sep 20 '22 12:09 lumbric

xref #6904 (not sure it is 100% relevant, but the problem also arises when .sel is called within some parallel workflow).

benbovy avatar Sep 20 '22 15:09 benbovy

@benbovy thanks for the hint! I tried passing an explicit lock to xr.open_mfdataset() as suggested, but didn't change anything, still the same exception. I will double check, if I did it the right way, I might be missing something.

lumbric avatar Sep 20 '22 15:09 lumbric

I have managed to reduce the reproducing example (see "Minimal Complete Verifiable Example 2" above) and then also find a proper solution to fix this issue. I am still not sure whether this is a bug or intended behavior, so I'll won't close the issue for now.

Basically the issue occurs when a chunked NetCDF file is loaded from disk, passed to xarray.map_blocks() and is then used in .sel() as parameter to get a subset of some other xarray object which is not passed to the worker func(). I think the proper solution is to use the args parameter of map_blocks() instead of .sel():

--- run-broken.py	2022-09-22 13:00:41.095555961 +0200
+++ run.py	2022-09-22 13:01:14.452696511 +0200
@@ -30,17 +30,17 @@
     def resample_annually(data):
         return data.sortby("time").resample(time="1A", label="left", loffset="1D").mean(dim="time")
 
-    def worker(data):
-        locations_chunk = locations.sel(locations=data.locations)
-        out_raw = data * locations_chunk
+    def worker(data, locations):
+        out_raw = data * locations
         out = resample_annually(out_raw)
         return out
 
     template = resample_annually(data)
 
     out = xr.map_blocks(
-        lambda data: worker(data).compute().chunk({"time": None}),
+        lambda data, locations: worker(data, locations).compute().chunk({"time": None}),
         data,
+        (locations,),
         template=template,
     )

This seems to fix this issue and seems to be the proper solution anyway.

I still don't see why I am not allowed to use .sel() on shadowed objects in the worker func()´. Is this on purpose? If yes, should we add something to the documentation? Is this a specific behavior of map_blocks()`? Is it related to #6904?

lumbric avatar Sep 22 '22 11:09 lumbric

I've had the same issues under the exact same conditions. However, it happens whether I use dask or not.

This solution fixes it, but I agree at least a doc update would be helpful!

jessjaco avatar Oct 04 '22 20:10 jessjaco

I agree with just passing all args explicitly. Does it work otherwise with "processes"?

As a side note, this particular example seems quite convoluted.

  1. Why are you chunking iniside the mapped function?
  2. If you conda install flox, the resample operation should be quite efficient, without the need to use map_blocks

dcherian avatar Oct 04 '22 20:10 dcherian

I agree with just passing all args explicitly. Does it work otherwise with "processes"?

What do you mean by that?

  1. Why are you chunking iniside the mapped function?

Uhm yes, you are right, this should be removed, not sure how this happened. Removing .chunk({"time": None}) in the lambda function does not change the behavior of the example regarding this issue.

  1. If you conda install flox, the resample operation should be quite efficient, without the need to use map_blocks

Oh wow, thanks! Haven't seen flox before.

lumbric avatar Oct 05 '22 07:10 lumbric

Thanks for raising. I too am intermittently hitting this pandas.errors.InvalidIndexError using xr.Dataset.map_blocks and compute with the default (threads) scheduler. The error does not occur with either "synchronous" or "processes" scheduling. It also does not occur when I throw a call to print in the function handed to map_blocks (so ... race condition?).

Using scheduler="processes" as a solution for now, and will try to come back with another MCVE when time allows.

itcarroll avatar Feb 11 '24 21:02 itcarroll

@itcarroll did you see my comment above how I fixed my issue? Are you using shadowed variables in your worker function?

lumbric avatar Mar 02 '24 16:03 lumbric