modin icon indicating copy to clipboard operation
modin copied to clipboard

Axis apply that gives partitions mismatching metadata along complementary axis leaves dataframe in invalid state.

Open joelostlund opened this issue 3 years ago • 2 comments

System information

  • OS Platform and Distribution (e.g., Linux Ubuntu 16.04):
  • MacOSX x86
  • Modin version (modin.__version__):
  • 0.15.2
  • Python version:
  • 3.8.13
  • Code we can use to reproduce: I am trying to return a dataframe after doing a pandas apply. But I am getting a long error. This works perfectly on regular pandas. This is what I am trying todo in a Jupyter notebook:
def f(row):
    for daw in row[1]:
        col = f"descriptor_{daw['descriptor']}"
        row[col] = daw['weight']
    for metagenre in row[2]:
        col = f"metagenre_{metagenre}"
        row[col] = 1
    for genre in row[3]:
        col = f"genre_{genre}"
        row[col] = 1
    return row

df = training_set_data.apply(f, axis=1)
df

And I am getting the following error:


AssertionError Traceback (most recent call last) ~/.pyenv/versions/3.8.12/envs/delivery-notebooks/lib/python3.8/site-packages/IPython/core/formatters.py in call(self, obj) 700 type_pprinters=self.type_printers, 701 deferred_pprinters=self.deferred_printers) --> 702 printer.pretty(obj) 703 printer.flush() 704 return stream.getvalue()

~/.pyenv/versions/3.8.12/envs/delivery-notebooks/lib/python3.8/site-packages/IPython/lib/pretty.py in pretty(self, obj) 392 if cls is not object
393 and callable(cls.dict.get('repr')): --> 394 return _repr_pprint(obj, self, cycle) 395 396 return _default_pprint(obj, self, cycle)

~/.pyenv/versions/3.8.12/envs/delivery-notebooks/lib/python3.8/site-packages/IPython/lib/pretty.py in repr_pprint(obj, p, cycle) 698 """A pprint that just redirects to the normal repr function.""" 699 # Find newlines and replace them with p.break() --> 700 output = repr(obj) 701 lines = output.splitlines() 702 with p.group():

~/.pyenv/versions/3.8.12/envs/delivery-notebooks/lib/python3.8/site-packages/modin/logging/logger_metaclass.py in log_wrap(*args, **kwargs) 66 logger.info(f"END::PANDAS-API::{class_name}.{method_name}") 67 return result ---> 68 return method(*args, **kwargs) 69 70 return log_wrap

~/.pyenv/versions/3.8.12/envs/delivery-notebooks/lib/python3.8/site-packages/modin/pandas/dataframe.py in repr(self) 213 214 num_cols += len(self.columns) - i --> 215 result = repr(self._build_repr_df(num_rows, num_cols)) 216 if len(self.index) > num_rows or len(self.columns) > num_cols: 217 # The split here is so that we don't repr pandas row lengths.

~/.pyenv/versions/3.8.12/envs/delivery-notebooks/lib/python3.8/site-packages/modin/logging/logger_metaclass.py in log_wrap(*args, **kwargs) 66 logger.info(f"END::PANDAS-API::{class_name}.{method_name}") 67 return result ---> 68 return method(*args, **kwargs) 69 70 return log_wrap

~/.pyenv/versions/3.8.12/envs/delivery-notebooks/lib/python3.8/site-packages/modin/pandas/base.py in _build_repr_df(self, num_rows, num_cols) 201 else: 202 indexer = row_indexer --> 203 return self.iloc[indexer]._query_compiler.to_pandas() 204 205 def _update_inplace(self, new_query_compiler):

~/.pyenv/versions/3.8.12/envs/delivery-notebooks/lib/python3.8/site-packages/modin/logging/logger_metaclass.py in log_wrap(*args, **kwargs) 66 logger.info(f"END::PANDAS-API::{class_name}.{method_name}") 67 return result ---> 68 return method(*args, **kwargs) 69 70 return log_wrap

~/.pyenv/versions/3.8.12/envs/delivery-notebooks/lib/python3.8/site-packages/modin/core/storage_formats/pandas/query_compiler.py in to_pandas(self) 257 258 def to_pandas(self): --> 259 return self._modin_frame.to_pandas() 260 261 @classmethod

~/.pyenv/versions/3.8.12/envs/delivery-notebooks/lib/python3.8/site-packages/modin/logging/logger_metaclass.py in log_wrap(*args, **kwargs) 66 logger.info(f"END::PANDAS-API::{class_name}.{method_name}") 67 return result ---> 68 return method(*args, **kwargs) 69 70 return log_wrap

~/.pyenv/versions/3.8.12/envs/delivery-notebooks/lib/python3.8/site-packages/modin/core/dataframe/pandas/dataframe/dataframe.py in run_f_on_minimally_updated_metadata(self, *args, **kwargs) 113 elif apply_axis == "rows": 114 obj._propagate_index_objs(axis=0) --> 115 result = f(self, *args, **kwargs) 116 if apply_axis is None and not transpose: 117 result._deferred_index = self._deferred_index

~/.pyenv/versions/3.8.12/envs/delivery-notebooks/lib/python3.8/site-packages/modin/core/dataframe/pandas/dataframe/dataframe.py in to_pandas(self) 2833 pandas.DataFrame 2834 """ -> 2835 df = self._partition_mgr_cls.to_pandas(self._partitions) 2836 if df.empty: 2837 df = pandas.DataFrame(columns=self.columns, index=self.index)

~/.pyenv/versions/3.8.12/envs/delivery-notebooks/lib/python3.8/site-packages/modin/core/dataframe/pandas/partitioning/partition_manager.py in to_pandas(cls, partitions) 658 return pandas.DataFrame() 659 else: --> 660 return concatenate(df_rows) 661 662 @classmethod

~/.pyenv/versions/3.8.12/envs/delivery-notebooks/lib/python3.8/site-packages/modin/core/dataframe/pandas/utils.py in concatenate(dfs) 36 """ 37 for df in dfs: ---> 38 assert df.columns.equals(dfs[0].columns) 39 for i in range(len(dfs[0].columns)): 40 if dfs[0].dtypes.iloc[i].name != "category":

AssertionError:

AssertionError Traceback (most recent call last) ~/.pyenv/versions/3.8.12/envs/delivery-notebooks/lib/python3.8/site-packages/IPython/core/formatters.py in call(self, obj) 343 method = get_real_method(obj, self.print_method) 344 if method is not None: --> 345 return method() 346 return None 347 else:

~/.pyenv/versions/3.8.12/envs/delivery-notebooks/lib/python3.8/site-packages/modin/logging/logger_metaclass.py in log_wrap(*args, **kwargs) 66 logger.info(f"END::PANDAS-API::{class_name}.{method_name}") 67 return result ---> 68 return method(*args, **kwargs) 69 70 return log_wrap

~/.pyenv/versions/3.8.12/envs/delivery-notebooks/lib/python3.8/site-packages/modin/pandas/dataframe.py in repr_html(self) 235 # We use pandas repr_html to get a string of the HTML representation 236 # of the dataframe. --> 237 result = self._build_repr_df(num_rows, num_cols).repr_html() 238 if len(self.index) > num_rows or len(self.columns) > num_cols: 239 # We split so that we insert our correct dataframe dimensions.

~/.pyenv/versions/3.8.12/envs/delivery-notebooks/lib/python3.8/site-packages/modin/logging/logger_metaclass.py in log_wrap(*args, **kwargs) 66 logger.info(f"END::PANDAS-API::{class_name}.{method_name}") 67 return result ---> 68 return method(*args, **kwargs) 69 70 return log_wrap

~/.pyenv/versions/3.8.12/envs/delivery-notebooks/lib/python3.8/site-packages/modin/pandas/base.py in _build_repr_df(self, num_rows, num_cols) 201 else: 202 indexer = row_indexer --> 203 return self.iloc[indexer]._query_compiler.to_pandas() 204 205 def _update_inplace(self, new_query_compiler):

~/.pyenv/versions/3.8.12/envs/delivery-notebooks/lib/python3.8/site-packages/modin/logging/logger_metaclass.py in log_wrap(*args, **kwargs) 66 logger.info(f"END::PANDAS-API::{class_name}.{method_name}") 67 return result ---> 68 return method(*args, **kwargs) 69 70 return log_wrap

~/.pyenv/versions/3.8.12/envs/delivery-notebooks/lib/python3.8/site-packages/modin/core/storage_formats/pandas/query_compiler.py in to_pandas(self) 257 258 def to_pandas(self): --> 259 return self._modin_frame.to_pandas() 260 261 @classmethod

~/.pyenv/versions/3.8.12/envs/delivery-notebooks/lib/python3.8/site-packages/modin/logging/logger_metaclass.py in log_wrap(*args, **kwargs) 66 logger.info(f"END::PANDAS-API::{class_name}.{method_name}") 67 return result ---> 68 return method(*args, **kwargs) 69 70 return log_wrap

~/.pyenv/versions/3.8.12/envs/delivery-notebooks/lib/python3.8/site-packages/modin/core/dataframe/pandas/dataframe/dataframe.py in run_f_on_minimally_updated_metadata(self, *args, **kwargs) 113 elif apply_axis == "rows": 114 obj._propagate_index_objs(axis=0) --> 115 result = f(self, *args, **kwargs) 116 if apply_axis is None and not transpose: 117 result._deferred_index = self._deferred_index

~/.pyenv/versions/3.8.12/envs/delivery-notebooks/lib/python3.8/site-packages/modin/core/dataframe/pandas/dataframe/dataframe.py in to_pandas(self) 2833 pandas.DataFrame 2834 """ -> 2835 df = self._partition_mgr_cls.to_pandas(self._partitions) 2836 if df.empty: 2837 df = pandas.DataFrame(columns=self.columns, index=self.index)

~/.pyenv/versions/3.8.12/envs/delivery-notebooks/lib/python3.8/site-packages/modin/core/dataframe/pandas/partitioning/partition_manager.py in to_pandas(cls, partitions) 658 return pandas.DataFrame() 659 else: --> 660 return concatenate(df_rows) 661 662 @classmethod

~/.pyenv/versions/3.8.12/envs/delivery-notebooks/lib/python3.8/site-packages/modin/core/dataframe/pandas/utils.py in concatenate(dfs) 36 """ 37 for df in dfs: ---> 38 assert df.columns.equals(dfs[0].columns) 39 for i in range(len(dfs[0].columns)): 40 if dfs[0].dtypes.iloc[i].name != "category":

AssertionError:

How to solve this?

joelostlund avatar Jul 20 '22 21:07 joelostlund

@joelostlund thank you for reporting this bug. I'm very sorry for this Modin error in an assertion that I wrote. I'll work on a fix.

I can reproduce the bug with:

import modin.pandas as pd
from modin.config import MinPartitionSize
import pandas

df = pd.DataFrame(list(range(MinPartitionSize.get() + 1)))
print(df.apply(lambda row: pandas.Series(1) if row[0] < MinPartitionSize.get() else pandas.Series([1, 2]), axis=1))

The problem is that different row partitions can have different columns, so the assertion I added is incorrect.

mvashishtha avatar Jul 21 '22 19:07 mvashishtha

Initially, I thought this was a bug with the concatenate called in to_pandas(), but on further reflection, I realized that it's actually correct to assume internally that all partitions along one axis in a Modin dataframe have the same metadata along the complementary axis. We should coerce (basically concat) partitions with mismatching indices so that all partitions in a row agree on their row indices, and vice versa for partitions in a column. See this failure that applies along columns instead, so it makes it past concatenate, but then hits a metadata mismatch failure:

import modin.pandas as pd
import pandas

# Make dataframe with (1, 2) partitioning
df1 = pd.DataFrame([1])
df2 = pd.DataFrame([2])
df = pd.concat([df1, df2], axis=1)

print(df.apply(lambda col: pandas.Series(3) if col[0] == 1 else pandas.Series([4, 5]))

Describe the problem

The code above works in pandas and gives:

     0  0
0  3.0  4
1  NaN  5

But in Modin, I get an internal error ending with Internal and external indices on axis 0 do not match.. The code takes a (1, 2) partitioned dataframe and applies a function per column: for the first column get a length-1 series of [3], and for the second column get a length-2 series of [4, 5]. pandas will join the two columns even though they have different indexes. Modin incorrectly assumes in _compute_axis_labels that the index from the result of the first partition will be the index of the result of the entire dataframe.

I think what we need to do instead is _copartition the dataframes resulting from apply, as we do in concat.

mvashishtha avatar Jul 25 '22 19:07 mvashishtha

+1 I am getting this error as well, and I was thinking that I am doing something wrong for more than a day until I rewrote the apply and still got the same error. Here are fixes that I tried and didn't work:

  1. I am reading data from a CSV so I though that maybe Pandas is guessing different types for columns for different partitions based on the data it seems in each partition. I tried to fix this by my reading the CSV as strings: pd.read_csv(s3_url, dtype=defaultdict(lambda: 'string'), converters=defaultdict(lambda: str)). This didn't work.
  2. I then tried to set the column types columns after the apply function, but that didn't work either: legacy_labeled_data_bool = legacy_labeled_data.astype(JsonCorpusColumnsExt().get_column_dtypes())

As soon as Modin needs to default to pandas, the assertion is raised. This happens with display for example.

What is the timeline for a fix, or any known work around?

timeleft-- avatar Mar 22 '23 21:03 timeleft--

I found myself a workaround:

modin_df_fixed = ray.data.from_modin(modin_df_broken).to_modin()
display(legacy_labeled_data_fixed)

The conversion from arrow and back seems to have solved the problematic partition. Besides fixing this bug, please add an assertion message saying what are the columns in the two dfs and what partitions they represent. Thanks!

timeleft-- avatar Mar 22 '23 22:03 timeleft--

I've dug into the issue, and I believe it's more complex than originally thought.

  1. we cannot use ._copartition() as-is because it works at dataframe level, and we need it at partitions level, but what's worse is...
  2. ... we're breaking a cornerstone-like assumption that all partition axes are the same

When going down from df.apply() we eventually end up here: https://github.com/modin-project/modin/blob/2aeb1e0ff85aa90d2d0e64967b5e9e1e40ac9d3a/modin/core/dataframe/pandas/dataframe/dataframe.py#L2828-L2838

and, in case of such funky function being applied, partitions coming out of broadcast_axis_partitions() have different axes, i.e. they can no longer be seen as parts of the grid, but the code around that is built on top of this assumption.

I don't see any easy fix to actually unify the partitions, as even checking that they're still in sync would require computing row and column indices and labels alongside each row (and column) of the bigger grid (versus how right now _compute_axis_labels_and_lengths() just takes first column when building row indices or first row when building column indices) - this at best introduces a synchronization point that hampers parallelism and performance.

Initially I thought of adding a flag to be passed from the frontend (i.e. from BaseDataset.apply()) that would tell the PandasDataframe to synchronize its partitions, but I believe that it would still hurt in a lot of cases (much like #3411).

So what I propose instead is to make a context manager that would toggle this cross-partition sync, this would deviate from pandas API, but I don't see any other way of doing this optimally (that design is inspired by Python decimal module).

CC @devin-petersohn @RehanSD @mvashishtha @YarShev for input.

vnlitvinov avatar Mar 29 '23 15:03 vnlitvinov