modin icon indicating copy to clipboard operation
modin copied to clipboard

BUG: `AttributeError: 'PandasOnRayDataframeColumnPartition' object has no attribute '_data'` when adding dataframes of certain size

Open noloerino opened this issue 3 years ago • 6 comments

System information

  • OS Platform and Distribution (e.g., Linux Ubuntu 16.04): macOS Monterey 12.4 (Apple Silicon)
  • Modin version (modin.__version__): latest master (cc3bdb)
  • Python version: 3.9.13
  • Code we can use to reproduce:
import ray
ray.init()
import modin.pandas as pd
s1 = 13
s2 = 13
df1 = pd.concat([pd.DataFrame([i]) for i in range(s1)])
df2 = pd.concat([pd.DataFrame([i]) for i in range(s2)])
print(df1 + df2)

If either s1 or s2 parameter is 12 or smaller, this error doesn't occur; the error also doesn't occur when s1=14 and s2=12 (I have not tried binary searching more thoroughly). The error also doesn't occur when the dataframes are constructed from a single list (such as df1 = pd.DataFrame([i for i in range(13)])).

Describe the problem

On Ray, for dataframes with a certain number of partitions, attempting to add them together seems to cause something in the Modin codebase to try to treat a logical (column) partition as a physical (block) partition by accessing its _data field. In the given code, df1._query_compiler._modin_frame._partitions.shape is (7, 1).

This may be related to an existing issue since it may be an issue with virtual partition construction, but I'm unsure what the precise root cause is.

Source code / logs

Stack trace
2022-07-20 14:53:39,547	INFO services.py:1456 -- View the Ray dashboard at http://127.0.0.1:8265
UserWarning: When using a pre-initialized Ray cluster, please ensure that the runtime env sets environment variable __MODIN_AUTOIMPORT_PANDAS__ to 1
UserWarning: Distributing  object. This may take some time.
Traceback (most recent call last):
  File "/Users/jhshi/code/modin/repros/new.py", line 8, in 
    print(df1 + df2)
  File "/Users/jhshi/code/modin/modin/logging/logger_decorator.py", line 128, in run_and_log
    return obj(*args, **kwargs)
  File "/Users/jhshi/code/modin/modin/pandas/dataframe.py", line 536, in add
    return self._binary_op(
  File "/Users/jhshi/code/modin/modin/logging/logger_decorator.py", line 128, in run_and_log
    return obj(*args, **kwargs)
  File "/Users/jhshi/code/modin/modin/pandas/base.py", line 397, in _binary_op
    new_query_compiler = getattr(self._query_compiler, op)(other, **kwargs)
  File "/Users/jhshi/code/modin/modin/logging/logger_decorator.py", line 128, in run_and_log
    return obj(*args, **kwargs)
  File "/Users/jhshi/code/modin/modin/core/dataframe/algebra/binary.py", line 92, in caller
    query_compiler._modin_frame.binary_op(
  File "/Users/jhshi/code/modin/modin/logging/logger_decorator.py", line 128, in run_and_log
    return obj(*args, **kwargs)
  File "/Users/jhshi/code/modin/modin/core/dataframe/pandas/dataframe/dataframe.py", line 115, in run_f_on_minimally_updated_metadata
    result = f(self, *args, **kwargs)
  File "/Users/jhshi/code/modin/modin/core/dataframe/pandas/dataframe/dataframe.py", line 2531, in binary_op
    else self._partition_mgr_cls.binary_operation(
  File "/Users/jhshi/code/modin/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition_manager.py", line 55, in magic
    result_parts = f(*args, **kwargs)
  File "/Users/jhshi/code/modin/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition_manager.py", line 413, in binary_operation
    return super(PandasOnRayDataframePartitionManager, cls).binary_operation(
  File "/Users/jhshi/code/modin/modin/core/dataframe/pandas/partitioning/partition_manager.py", line 1290, in binary_operation
    [
  File "/Users/jhshi/code/modin/modin/core/dataframe/pandas/partitioning/partition_manager.py", line 1291, in 
    [
  File "/Users/jhshi/code/modin/modin/core/dataframe/pandas/partitioning/partition_manager.py", line 1294, in 
    right[row_idx][col_idx]._data,
AttributeError: 'PandasOnRayDataframeColumnPartition' object has no attribute '_data'

noloerino avatar Jul 20 '22 21:07 noloerino

Thanks @noloerino . The root cause is that there are a few places, including binary_operation, where we violate the partition API by accessing _data. non-full-axis virtual partitions are supposed to implement the same API as block partitions, but they indeed don't have a single _data field because they may consist of multiple pieces of data. We would have to squeeze the virtual partition into a single physical object with something like force_materialization, and/or work with the virtual partitions' list_of_blocks.

mvashishtha avatar Jul 22 '22 06:07 mvashishtha

@mvashishtha can you reproduce the issue? I tried different values of s1/s2 parameters, but it still works.

Also, I don't get how PandasOnRayDataframeColumnPartition can occur in partition manager code for binary_operation. Binary operator assumes the working with 2d array of block partitions by our guides: https://modin.readthedocs.io/en/stable/flow/modin/core/dataframe/algebra.html#binary-operator PandasDataframe._copartition function returns 2-d array of block-partitions (if quickly see on docstring).

prutskov avatar Aug 08 '22 13:08 prutskov

@prutskov

can you reproduce the issue? I tried different values of s1/s2 parameters, but it still works.

I can reproduce the error with the original script except s1 = s2 = 40 on my laptop at Modin version cc3bdb7e4bcd2f8305a5faa02d82036af22cc5df. Depending on your machine's specs, you may need some more partitions to trigger the rebalance, which will create virtual partitions.

At the latest Modin version, daa50b1216f8fdc0b7f3f7cd5802be02b92a288f, if I try the original script with s1 = s2 = 40, I get a new error ending in AttributeError: 'RangeIndex' object has no attribute 'iloc'.

RayTaskError
RayTaskError(AttributeError): ray::deploy_ray_func() (pid=56927, ip=127.0.0.1)
  File "/Users/maheshvashishtha/software_sources/modin/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py", line 548, in deploy_ray_func
    result = func(*args, **kwargs)
  File "/Users/maheshvashishtha/software_sources/modin/modin/core/dataframe/pandas/partitioning/axis_partition.py", line 155, in deploy_axis_func
    return split_result_of_axis_func_pandas(axis, num_splits, result, lengths)
  File "/Users/maheshvashishtha/software_sources/modin/modin/core/storage_formats/pandas/utils.py", line 94, in split_result_of_axis_func_pandas
    return [result.iloc[sums[i] : sums[i + 1]] for i in range(len(sums) - 1)]
  File "/Users/maheshvashishtha/software_sources/modin/modin/core/storage_formats/pandas/utils.py", line 94, in <listcomp>
    return [result.iloc[sums[i] : sums[i + 1]] for i in range(len(sums) - 1)]
AttributeError: 'RangeIndex' object has no attribute 'iloc'
Main modin error
RayTaskError(AttributeError)              Traceback (most recent call last)
Input In [1], in <cell line: 8>()
      6 df1 = pd.concat([pd.DataFrame([i]) for i in range(s1)])
      7 df2 = pd.concat([pd.DataFrame([i]) for i in range(s2)])
----> 8 print(df1 + df2)

File ~/software_sources/modin/modin/logging/logger_decorator.py:128, in enable_logging.<locals>.decorator.<locals>.run_and_log(*args, **kwargs)
    113 """
    114 Compute function with logging if Modin logging is enabled.
    115
   (...)
    125 Any
    126 """
    127 if LogMode.get() == "disable":
--> 128     return obj(*args, **kwargs)
    130 logger = get_logger()
    131 logger_level = getattr(logger, log_level)

File ~/software_sources/modin/modin/pandas/dataframe.py:541, in DataFrame.add(self, other, axis, level, fill_value)
    535 def add(
    536     self, other, axis="columns", level=None, fill_value=None
    537 ):  # noqa: PR01, RT01, D200
    538     """
    539     Get addition of ``DataFrame`` and `other`, element-wise (binary operator `add`).
    540     """
--> 541     return self._binary_op(
    542         "add",
    543         other,
    544         axis=axis,
    545         level=level,
    546         fill_value=fill_value,
    547         broadcast=isinstance(other, Series),
    548     )

File ~/software_sources/modin/modin/logging/logger_decorator.py:128, in enable_logging.<locals>.decorator.<locals>.run_and_log(*args, **kwargs)
    113 """
    114 Compute function with logging if Modin logging is enabled.
    115
   (...)
    125 Any
    126 """
    127 if LogMode.get() == "disable":
--> 128     return obj(*args, **kwargs)
    130 logger = get_logger()
    131 logger_level = getattr(logger, log_level)

File ~/software_sources/modin/modin/pandas/base.py:391, in BasePandasDataset._binary_op(self, op, other, **kwargs)
    389 if op in exclude_list:
    390     kwargs.pop("axis")
--> 391 new_query_compiler = getattr(self._query_compiler, op)(other, **kwargs)
    392 return self._create_or_update_from_compiler(new_query_compiler)

File ~/software_sources/modin/modin/logging/logger_decorator.py:128, in enable_logging.<locals>.decorator.<locals>.run_and_log(*args, **kwargs)
    113 """
    114 Compute function with logging if Modin logging is enabled.
    115
   (...)
    125 Any
    126 """
    127 if LogMode.get() == "disable":
--> 128     return obj(*args, **kwargs)
    130 logger = get_logger()
    131 logger_level = getattr(logger, log_level)

File ~/software_sources/modin/modin/core/dataframe/algebra/binary.py:92, in Binary.call.<locals>.caller(query_compiler, other, broadcast, *args, **kwargs)
     81         return query_compiler.__constructor__(
     82             query_compiler._modin_frame.broadcast_apply(
     83                 axis,
   (...)
     88             )
     89         )
     90     else:
     91         return query_compiler.__constructor__(
---> 92             query_compiler._modin_frame.binary_op(
     93                 lambda x, y: func(x, y, *args, **kwargs),
     94                 other._modin_frame,
     95                 join_type=join_type,
     96             )
     97         )
     98 else:
     99     if isinstance(other, (list, np.ndarray, pandas.Series)):

File ~/software_sources/modin/modin/logging/logger_decorator.py:128, in enable_logging.<locals>.decorator.<locals>.run_and_log(*args, **kwargs)
    113 """
    114 Compute function with logging if Modin logging is enabled.
    115
   (...)
    125 Any
    126 """
    127 if LogMode.get() == "disable":
--> 128     return obj(*args, **kwargs)
    130 logger = get_logger()
    131 logger_level = getattr(logger, log_level)

File ~/software_sources/modin/modin/core/dataframe/pandas/dataframe/dataframe.py:115, in lazy_metadata_decorator.<locals>.decorator.<locals>.run_f_on_minimally_updated_metadata(self, *args, **kwargs)
    113     elif apply_axis == "rows":
    114         obj._propagate_index_objs(axis=0)
--> 115 result = f(self, *args, **kwargs)
    116 if apply_axis is None and not transpose:
    117     result._deferred_index = self._deferred_index

File ~/software_sources/modin/modin/core/dataframe/pandas/dataframe/dataframe.py:2521, in PandasDataframe.binary_op(self, op, right_frame, join_type)
   2502 @lazy_metadata_decorator(apply_axis="both")
   2503 def binary_op(self, op, right_frame, join_type="outer"):
   2504     """
   2505     Perform an operation that requires joining with another Modin DataFrame.
   2506
   (...)
   2519         New Modin DataFrame.
   2520     """
-> 2521     left_parts, right_parts, joined_index, row_lengths = self._copartition(
   2522         0, right_frame, join_type, sort=True
   2523     )
   2524     new_left_frame = self.__constructor__(
   2525         left_parts, joined_index, self.columns, row_lengths, self._column_widths
   2526     )
   2527     new_right_frame = self.__constructor__(
   2528         right_parts[0],
   2529         joined_index,
   (...)
   2532         right_frame._column_widths,
   2533     )

File ~/software_sources/modin/modin/logging/logger_decorator.py:128, in enable_logging.<locals>.decorator.<locals>.run_and_log(*args, **kwargs)
    113 """
    114 Compute function with logging if Modin logging is enabled.
    115
   (...)
    125 Any
    126 """
    127 if LogMode.get() == "disable":
--> 128     return obj(*args, **kwargs)
    130 logger = get_logger()
    131 logger_level = getattr(logger, log_level)

File ~/software_sources/modin/modin/core/dataframe/pandas/dataframe/dataframe.py:2463, in PandasDataframe._copartition(self, axis, other, how, sort, force_repartition)
   2461 else:
   2462     reindexed_base = base_frame._partitions
-> 2463     base_lengths = self._column_widths if axis else self._row_lengths
   2465 others_lengths = [o._axes_lengths[axis] for o in other_frames]
   2467 # define conditions for reindexing and repartitioning `other` frames

File ~/software_sources/modin/modin/core/dataframe/pandas/dataframe/dataframe.py:244, in PandasDataframe._row_lengths(self)
    239 if self._row_lengths_cache is None:
    240     if len(self._partitions) > 0:
    241         (
    242             index,
    243             self._row_lengths_cache,
--> 244         ) = self._compute_axis_labels_and_lengths(0)
    245         if self._index_cache is None:
    246             self._index_cache = index

File ~/software_sources/modin/modin/logging/logger_decorator.py:128, in enable_logging.<locals>.decorator.<locals>.run_and_log(*args, **kwargs)
    113 """
    114 Compute function with logging if Modin logging is enabled.
    115
   (...)
    125 Any
    126 """
    127 if LogMode.get() == "disable":
--> 128     return obj(*args, **kwargs)
    130 logger = get_logger()
    131 logger_level = getattr(logger, log_level)

File ~/software_sources/modin/modin/core/dataframe/pandas/dataframe/dataframe.py:456, in PandasDataframe._compute_axis_labels_and_lengths(self, axis, partitions)
    454 if partitions is None:
    455     partitions = self._partitions
--> 456 new_index, internal_idx = self._partition_mgr_cls.get_indices(axis, partitions)
    457 return new_index, list(map(len, internal_idx))

File ~/software_sources/modin/modin/logging/logger_decorator.py:128, in enable_logging.<locals>.decorator.<locals>.run_and_log(*args, **kwargs)
    113 """
    114 Compute function with logging if Modin logging is enabled.
    115
   (...)
    125 Any
    126 """
    127 if LogMode.get() == "disable":
--> 128     return obj(*args, **kwargs)
    130 logger = get_logger()
    131 logger_level = getattr(logger, log_level)

File ~/software_sources/modin/modin/core/dataframe/pandas/partitioning/partition_manager.py:864, in PandasDataframePartitionManager.get_indices(cls, axis, partitions, index_func)
    862 target = partitions.T if axis == 0 else partitions
    863 new_idx = [idx.apply(func) for idx in target[0]] if len(target) else []
--> 864 new_idx = cls.get_objects_from_partitions(new_idx)
    865 # TODO FIX INFORMATION LEAK!!!!1!!1!!
    866 total_idx = new_idx[0].append(new_idx[1:]) if new_idx else new_idx

File ~/software_sources/modin/modin/logging/logger_decorator.py:128, in enable_logging.<locals>.decorator.<locals>.run_and_log(*args, **kwargs)
    113 """
    114 Compute function with logging if Modin logging is enabled.
    115
   (...)
    125 Any
    126 """
    127 if LogMode.get() == "disable":
--> 128     return obj(*args, **kwargs)
    130 logger = get_logger()
    131 logger_level = getattr(logger, log_level)

File ~/software_sources/modin/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition_manager.py:110, in PandasOnRayDataframePartitionManager.get_objects_from_partitions(cls, partitions)
     95 @classmethod
     96 def get_objects_from_partitions(cls, partitions):
     97     """
     98     Get the objects wrapped by `partitions` in parallel.
     99
   (...)
    108         The objects wrapped by `partitions`.
    109     """
--> 110     return ray.get([partition._data for partition in partitions])

File ~/opt/anaconda3/envs/modin-dev/lib/python3.10/site-packages/ray/_private/client_mode_hook.py:105, in client_mode_hook.<locals>.wrapper(*args, **kwargs)
    103     if func.__name__ != "init" or is_client_mode_enabled_by_default:
    104         return getattr(ray, func.__name__)(*args, **kwargs)
--> 105 return func(*args, **kwargs)

File ~/opt/anaconda3/envs/modin-dev/lib/python3.10/site-packages/ray/worker.py:1831, in get(object_refs, timeout)
   1829     worker.core_worker.dump_object_store_memory_usage()
   1830 if isinstance(value, RayTaskError):
-> 1831     raise value.as_instanceof_cause()
   1832 else:
   1833     raise value

System information

  • OS Platform and Distribution (e.g., Linux Ubuntu 16.04): macOS Monterey
  • Computer model: MacBook Pro (16-inch, 2019)
  • Memory: 16 GB 2667 MHz DDR4
  • Processor: 2.3 GHz 8-Core Intel Core i9
  • Python version: 3.10.4

mvashishtha avatar Aug 08 '22 20:08 mvashishtha

I now get the same RangeIndex error as @mvashishtha (also on version daa50b1), with s1 = 1; s2 = 13 sufficient for to cause the error. Here's my more detailed system info:

System information

  • OS Platform and Distribution (e.g., Linux Ubuntu 16.04): macOS Monterey 12.4
  • Computer model: MacBook Pro (14-inch, 2021)
  • Memory: 16 GB LPDDR5
  • Processor: Apple M1 Pro
  • Python version: 3.9.13

noloerino avatar Aug 08 '22 21:08 noloerino

Now the bug is that split_result_of_axis_func_pandas assumes that result is a pandas dataframe, but the index_func in get_indices returns an index:

https://github.com/modin-project/modin/blob/ef654c414364ca319812114324ad9d85e83ee68e/modin/core/dataframe/pandas/partitioning/partition_manager.py#L859

non-full-axis virtual partitions' apply calls split_result_of_axis_func_pandas under the hood via the superclass call: https://github.com/modin-project/modin/blob/9bf8d57ca44e22fd69b0abc55793cf60c199ab4d/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py#L312-L318

Regular block partitions don't try to split the result of the function call.

mvashishtha avatar Aug 09 '22 00:08 mvashishtha

... At the latest Modin version, daa50b1, if I try the original script with s1 = s2 = 40, I get a new error ending in AttributeError: 'RangeIndex' object has no attribute 'iloc'.

Thank you @mvashishtha, I reproduced the issue with s1=s2>=25. I think, that the main problem is that PandasDataframe._partitions can contain any partition type in 2d-list. I see 2 ways to resolve this:

  1. Add restriction to possible type of PandasDataframe._partitions[0][0] on block partition only.
  2. Add possibility when PandasDataframe._partitions could be 1-d numpy array, in case element is axis-partition (I'm not sure that this is architecturally correctly).

prutskov avatar Aug 09 '22 14:08 prutskov

@noloerino @prutskov it turns out only particular numbers of rows cause the error, depending on your NPartitions, because of the details of our rebalancing algorithm (see #4811 and #4810).

I think it's better to fix #4811 by never splitting the result when num_splits = 1, so move this to the top of split_result_of_axis_func_pandas:

https://github.com/modin-project/modin/blob/8864bc197974da6d8cda2de2f35ca31d561be1cc/modin/core/storage_formats/pandas/utils.py#L98-L99

After that, we'll have the original _data error, which I think we should fix as part of #4530.

mvashishtha avatar Aug 11 '22 23:08 mvashishtha

Looks like @prutskov left a PR open before he left. We should try to get the fix merged in if possible.

pyrito avatar Aug 31 '22 21:08 pyrito

I'm marking this as P0 because I think it's a significant bug, and it's a regression that virtual partitioning introduced near the beginning of 2022.

mvashishtha avatar Sep 22 '22 19:09 mvashishtha