modin
modin copied to clipboard
BUG: `AttributeError: 'PandasOnRayDataframeColumnPartition' object has no attribute '_data'` when adding dataframes of certain size
System information
- OS Platform and Distribution (e.g., Linux Ubuntu 16.04): macOS Monterey 12.4 (Apple Silicon)
- Modin version (
modin.__version__): latest master (cc3bdb) - Python version: 3.9.13
- Code we can use to reproduce:
import ray
ray.init()
import modin.pandas as pd
s1 = 13
s2 = 13
df1 = pd.concat([pd.DataFrame([i]) for i in range(s1)])
df2 = pd.concat([pd.DataFrame([i]) for i in range(s2)])
print(df1 + df2)
If either s1 or s2 parameter is 12 or smaller, this error doesn't occur; the error also doesn't occur when s1=14 and s2=12 (I have not tried binary searching more thoroughly). The error also doesn't occur when the dataframes are constructed from a single list (such as df1 = pd.DataFrame([i for i in range(13)])).
Describe the problem
On Ray, for dataframes with a certain number of partitions, attempting to add them together seems to cause something in the Modin codebase to try to treat a logical (column) partition as a physical (block) partition by accessing its _data field. In the given code, df1._query_compiler._modin_frame._partitions.shape is (7, 1).
This may be related to an existing issue since it may be an issue with virtual partition construction, but I'm unsure what the precise root cause is.
Source code / logs
Stack trace
2022-07-20 14:53:39,547 INFO services.py:1456 -- View the Ray dashboard at http://127.0.0.1:8265 UserWarning: When using a pre-initialized Ray cluster, please ensure that the runtime env sets environment variable __MODIN_AUTOIMPORT_PANDAS__ to 1 UserWarning: Distributingobject. This may take some time. Traceback (most recent call last): File "/Users/jhshi/code/modin/repros/new.py", line 8, in print(df1 + df2) File "/Users/jhshi/code/modin/modin/logging/logger_decorator.py", line 128, in run_and_log return obj(*args, **kwargs) File "/Users/jhshi/code/modin/modin/pandas/dataframe.py", line 536, in add return self._binary_op( File "/Users/jhshi/code/modin/modin/logging/logger_decorator.py", line 128, in run_and_log return obj(*args, **kwargs) File "/Users/jhshi/code/modin/modin/pandas/base.py", line 397, in _binary_op new_query_compiler = getattr(self._query_compiler, op)(other, **kwargs) File "/Users/jhshi/code/modin/modin/logging/logger_decorator.py", line 128, in run_and_log return obj(*args, **kwargs) File "/Users/jhshi/code/modin/modin/core/dataframe/algebra/binary.py", line 92, in caller query_compiler._modin_frame.binary_op( File "/Users/jhshi/code/modin/modin/logging/logger_decorator.py", line 128, in run_and_log return obj(*args, **kwargs) File "/Users/jhshi/code/modin/modin/core/dataframe/pandas/dataframe/dataframe.py", line 115, in run_f_on_minimally_updated_metadata result = f(self, *args, **kwargs) File "/Users/jhshi/code/modin/modin/core/dataframe/pandas/dataframe/dataframe.py", line 2531, in binary_op else self._partition_mgr_cls.binary_operation( File "/Users/jhshi/code/modin/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition_manager.py", line 55, in magic result_parts = f(*args, **kwargs) File "/Users/jhshi/code/modin/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition_manager.py", line 413, in binary_operation return super(PandasOnRayDataframePartitionManager, cls).binary_operation( File "/Users/jhshi/code/modin/modin/core/dataframe/pandas/partitioning/partition_manager.py", line 1290, in binary_operation [ File "/Users/jhshi/code/modin/modin/core/dataframe/pandas/partitioning/partition_manager.py", line 1291, in [ File "/Users/jhshi/code/modin/modin/core/dataframe/pandas/partitioning/partition_manager.py", line 1294, in right[row_idx][col_idx]._data, AttributeError: 'PandasOnRayDataframeColumnPartition' object has no attribute '_data'
Thanks @noloerino . The root cause is that there are a few places, including binary_operation, where we violate the partition API by accessing _data. non-full-axis virtual partitions are supposed to implement the same API as block partitions, but they indeed don't have a single _data field because they may consist of multiple pieces of data. We would have to squeeze the virtual partition into a single physical object with something like force_materialization, and/or work with the virtual partitions' list_of_blocks.
@mvashishtha can you reproduce the issue? I tried different values of s1/s2 parameters, but it still works.
Also, I don't get how PandasOnRayDataframeColumnPartition can occur in partition manager code for binary_operation. Binary operator assumes the working with 2d array of block partitions by our guides: https://modin.readthedocs.io/en/stable/flow/modin/core/dataframe/algebra.html#binary-operator
PandasDataframe._copartition function returns 2-d array of block-partitions (if quickly see on docstring).
@prutskov
can you reproduce the issue? I tried different values of s1/s2 parameters, but it still works.
I can reproduce the error with the original script except s1 = s2 = 40 on my laptop at Modin version cc3bdb7e4bcd2f8305a5faa02d82036af22cc5df. Depending on your machine's specs, you may need some more partitions to trigger the rebalance, which will create virtual partitions.
At the latest Modin version, daa50b1216f8fdc0b7f3f7cd5802be02b92a288f, if I try the original script with s1 = s2 = 40, I get a new error ending in AttributeError: 'RangeIndex' object has no attribute 'iloc'.
RayTaskError
RayTaskError(AttributeError): ray::deploy_ray_func() (pid=56927, ip=127.0.0.1)
File "/Users/maheshvashishtha/software_sources/modin/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py", line 548, in deploy_ray_func
result = func(*args, **kwargs)
File "/Users/maheshvashishtha/software_sources/modin/modin/core/dataframe/pandas/partitioning/axis_partition.py", line 155, in deploy_axis_func
return split_result_of_axis_func_pandas(axis, num_splits, result, lengths)
File "/Users/maheshvashishtha/software_sources/modin/modin/core/storage_formats/pandas/utils.py", line 94, in split_result_of_axis_func_pandas
return [result.iloc[sums[i] : sums[i + 1]] for i in range(len(sums) - 1)]
File "/Users/maheshvashishtha/software_sources/modin/modin/core/storage_formats/pandas/utils.py", line 94, in <listcomp>
return [result.iloc[sums[i] : sums[i + 1]] for i in range(len(sums) - 1)]
AttributeError: 'RangeIndex' object has no attribute 'iloc'
Main modin error
RayTaskError(AttributeError) Traceback (most recent call last)
Input In [1], in <cell line: 8>()
6 df1 = pd.concat([pd.DataFrame([i]) for i in range(s1)])
7 df2 = pd.concat([pd.DataFrame([i]) for i in range(s2)])
----> 8 print(df1 + df2)
File ~/software_sources/modin/modin/logging/logger_decorator.py:128, in enable_logging.<locals>.decorator.<locals>.run_and_log(*args, **kwargs)
113 """
114 Compute function with logging if Modin logging is enabled.
115
(...)
125 Any
126 """
127 if LogMode.get() == "disable":
--> 128 return obj(*args, **kwargs)
130 logger = get_logger()
131 logger_level = getattr(logger, log_level)
File ~/software_sources/modin/modin/pandas/dataframe.py:541, in DataFrame.add(self, other, axis, level, fill_value)
535 def add(
536 self, other, axis="columns", level=None, fill_value=None
537 ): # noqa: PR01, RT01, D200
538 """
539 Get addition of ``DataFrame`` and `other`, element-wise (binary operator `add`).
540 """
--> 541 return self._binary_op(
542 "add",
543 other,
544 axis=axis,
545 level=level,
546 fill_value=fill_value,
547 broadcast=isinstance(other, Series),
548 )
File ~/software_sources/modin/modin/logging/logger_decorator.py:128, in enable_logging.<locals>.decorator.<locals>.run_and_log(*args, **kwargs)
113 """
114 Compute function with logging if Modin logging is enabled.
115
(...)
125 Any
126 """
127 if LogMode.get() == "disable":
--> 128 return obj(*args, **kwargs)
130 logger = get_logger()
131 logger_level = getattr(logger, log_level)
File ~/software_sources/modin/modin/pandas/base.py:391, in BasePandasDataset._binary_op(self, op, other, **kwargs)
389 if op in exclude_list:
390 kwargs.pop("axis")
--> 391 new_query_compiler = getattr(self._query_compiler, op)(other, **kwargs)
392 return self._create_or_update_from_compiler(new_query_compiler)
File ~/software_sources/modin/modin/logging/logger_decorator.py:128, in enable_logging.<locals>.decorator.<locals>.run_and_log(*args, **kwargs)
113 """
114 Compute function with logging if Modin logging is enabled.
115
(...)
125 Any
126 """
127 if LogMode.get() == "disable":
--> 128 return obj(*args, **kwargs)
130 logger = get_logger()
131 logger_level = getattr(logger, log_level)
File ~/software_sources/modin/modin/core/dataframe/algebra/binary.py:92, in Binary.call.<locals>.caller(query_compiler, other, broadcast, *args, **kwargs)
81 return query_compiler.__constructor__(
82 query_compiler._modin_frame.broadcast_apply(
83 axis,
(...)
88 )
89 )
90 else:
91 return query_compiler.__constructor__(
---> 92 query_compiler._modin_frame.binary_op(
93 lambda x, y: func(x, y, *args, **kwargs),
94 other._modin_frame,
95 join_type=join_type,
96 )
97 )
98 else:
99 if isinstance(other, (list, np.ndarray, pandas.Series)):
File ~/software_sources/modin/modin/logging/logger_decorator.py:128, in enable_logging.<locals>.decorator.<locals>.run_and_log(*args, **kwargs)
113 """
114 Compute function with logging if Modin logging is enabled.
115
(...)
125 Any
126 """
127 if LogMode.get() == "disable":
--> 128 return obj(*args, **kwargs)
130 logger = get_logger()
131 logger_level = getattr(logger, log_level)
File ~/software_sources/modin/modin/core/dataframe/pandas/dataframe/dataframe.py:115, in lazy_metadata_decorator.<locals>.decorator.<locals>.run_f_on_minimally_updated_metadata(self, *args, **kwargs)
113 elif apply_axis == "rows":
114 obj._propagate_index_objs(axis=0)
--> 115 result = f(self, *args, **kwargs)
116 if apply_axis is None and not transpose:
117 result._deferred_index = self._deferred_index
File ~/software_sources/modin/modin/core/dataframe/pandas/dataframe/dataframe.py:2521, in PandasDataframe.binary_op(self, op, right_frame, join_type)
2502 @lazy_metadata_decorator(apply_axis="both")
2503 def binary_op(self, op, right_frame, join_type="outer"):
2504 """
2505 Perform an operation that requires joining with another Modin DataFrame.
2506
(...)
2519 New Modin DataFrame.
2520 """
-> 2521 left_parts, right_parts, joined_index, row_lengths = self._copartition(
2522 0, right_frame, join_type, sort=True
2523 )
2524 new_left_frame = self.__constructor__(
2525 left_parts, joined_index, self.columns, row_lengths, self._column_widths
2526 )
2527 new_right_frame = self.__constructor__(
2528 right_parts[0],
2529 joined_index,
(...)
2532 right_frame._column_widths,
2533 )
File ~/software_sources/modin/modin/logging/logger_decorator.py:128, in enable_logging.<locals>.decorator.<locals>.run_and_log(*args, **kwargs)
113 """
114 Compute function with logging if Modin logging is enabled.
115
(...)
125 Any
126 """
127 if LogMode.get() == "disable":
--> 128 return obj(*args, **kwargs)
130 logger = get_logger()
131 logger_level = getattr(logger, log_level)
File ~/software_sources/modin/modin/core/dataframe/pandas/dataframe/dataframe.py:2463, in PandasDataframe._copartition(self, axis, other, how, sort, force_repartition)
2461 else:
2462 reindexed_base = base_frame._partitions
-> 2463 base_lengths = self._column_widths if axis else self._row_lengths
2465 others_lengths = [o._axes_lengths[axis] for o in other_frames]
2467 # define conditions for reindexing and repartitioning `other` frames
File ~/software_sources/modin/modin/core/dataframe/pandas/dataframe/dataframe.py:244, in PandasDataframe._row_lengths(self)
239 if self._row_lengths_cache is None:
240 if len(self._partitions) > 0:
241 (
242 index,
243 self._row_lengths_cache,
--> 244 ) = self._compute_axis_labels_and_lengths(0)
245 if self._index_cache is None:
246 self._index_cache = index
File ~/software_sources/modin/modin/logging/logger_decorator.py:128, in enable_logging.<locals>.decorator.<locals>.run_and_log(*args, **kwargs)
113 """
114 Compute function with logging if Modin logging is enabled.
115
(...)
125 Any
126 """
127 if LogMode.get() == "disable":
--> 128 return obj(*args, **kwargs)
130 logger = get_logger()
131 logger_level = getattr(logger, log_level)
File ~/software_sources/modin/modin/core/dataframe/pandas/dataframe/dataframe.py:456, in PandasDataframe._compute_axis_labels_and_lengths(self, axis, partitions)
454 if partitions is None:
455 partitions = self._partitions
--> 456 new_index, internal_idx = self._partition_mgr_cls.get_indices(axis, partitions)
457 return new_index, list(map(len, internal_idx))
File ~/software_sources/modin/modin/logging/logger_decorator.py:128, in enable_logging.<locals>.decorator.<locals>.run_and_log(*args, **kwargs)
113 """
114 Compute function with logging if Modin logging is enabled.
115
(...)
125 Any
126 """
127 if LogMode.get() == "disable":
--> 128 return obj(*args, **kwargs)
130 logger = get_logger()
131 logger_level = getattr(logger, log_level)
File ~/software_sources/modin/modin/core/dataframe/pandas/partitioning/partition_manager.py:864, in PandasDataframePartitionManager.get_indices(cls, axis, partitions, index_func)
862 target = partitions.T if axis == 0 else partitions
863 new_idx = [idx.apply(func) for idx in target[0]] if len(target) else []
--> 864 new_idx = cls.get_objects_from_partitions(new_idx)
865 # TODO FIX INFORMATION LEAK!!!!1!!1!!
866 total_idx = new_idx[0].append(new_idx[1:]) if new_idx else new_idx
File ~/software_sources/modin/modin/logging/logger_decorator.py:128, in enable_logging.<locals>.decorator.<locals>.run_and_log(*args, **kwargs)
113 """
114 Compute function with logging if Modin logging is enabled.
115
(...)
125 Any
126 """
127 if LogMode.get() == "disable":
--> 128 return obj(*args, **kwargs)
130 logger = get_logger()
131 logger_level = getattr(logger, log_level)
File ~/software_sources/modin/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition_manager.py:110, in PandasOnRayDataframePartitionManager.get_objects_from_partitions(cls, partitions)
95 @classmethod
96 def get_objects_from_partitions(cls, partitions):
97 """
98 Get the objects wrapped by `partitions` in parallel.
99
(...)
108 The objects wrapped by `partitions`.
109 """
--> 110 return ray.get([partition._data for partition in partitions])
File ~/opt/anaconda3/envs/modin-dev/lib/python3.10/site-packages/ray/_private/client_mode_hook.py:105, in client_mode_hook.<locals>.wrapper(*args, **kwargs)
103 if func.__name__ != "init" or is_client_mode_enabled_by_default:
104 return getattr(ray, func.__name__)(*args, **kwargs)
--> 105 return func(*args, **kwargs)
File ~/opt/anaconda3/envs/modin-dev/lib/python3.10/site-packages/ray/worker.py:1831, in get(object_refs, timeout)
1829 worker.core_worker.dump_object_store_memory_usage()
1830 if isinstance(value, RayTaskError):
-> 1831 raise value.as_instanceof_cause()
1832 else:
1833 raise value
System information
- OS Platform and Distribution (e.g., Linux Ubuntu 16.04): macOS Monterey
- Computer model: MacBook Pro (16-inch, 2019)
- Memory: 16 GB 2667 MHz DDR4
- Processor: 2.3 GHz 8-Core Intel Core i9
- Python version: 3.10.4
I now get the same RangeIndex error as @mvashishtha (also on version daa50b1), with s1 = 1; s2 = 13 sufficient for to cause the error. Here's my more detailed system info:
System information
- OS Platform and Distribution (e.g., Linux Ubuntu 16.04): macOS Monterey 12.4
- Computer model: MacBook Pro (14-inch, 2021)
- Memory: 16 GB LPDDR5
- Processor: Apple M1 Pro
- Python version: 3.9.13
Now the bug is that split_result_of_axis_func_pandas assumes that result is a pandas dataframe, but the index_func in get_indices returns an index:
https://github.com/modin-project/modin/blob/ef654c414364ca319812114324ad9d85e83ee68e/modin/core/dataframe/pandas/partitioning/partition_manager.py#L859
non-full-axis virtual partitions' apply calls split_result_of_axis_func_pandas under the hood via the superclass call:
https://github.com/modin-project/modin/blob/9bf8d57ca44e22fd69b0abc55793cf60c199ab4d/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py#L312-L318
Regular block partitions don't try to split the result of the function call.
... At the latest Modin version, daa50b1, if I try the original script with
s1 = s2 = 40, I get a new error ending inAttributeError: 'RangeIndex' object has no attribute 'iloc'.
Thank you @mvashishtha, I reproduced the issue with s1=s2>=25. I think, that the main problem is that PandasDataframe._partitions can contain any partition type in 2d-list. I see 2 ways to resolve this:
- Add restriction to possible type of
PandasDataframe._partitions[0][0]on block partition only. - Add possibility when
PandasDataframe._partitionscould be 1-d numpy array, in case element is axis-partition (I'm not sure that this is architecturally correctly).
@noloerino @prutskov it turns out only particular numbers of rows cause the error, depending on your NPartitions, because of the details of our rebalancing algorithm (see #4811 and #4810).
I think it's better to fix #4811 by never splitting the result when num_splits = 1, so move this to the top of split_result_of_axis_func_pandas:
https://github.com/modin-project/modin/blob/8864bc197974da6d8cda2de2f35ca31d561be1cc/modin/core/storage_formats/pandas/utils.py#L98-L99
After that, we'll have the original _data error, which I think we should fix as part of #4530.
Looks like @prutskov left a PR open before he left. We should try to get the fix merged in if possible.
I'm marking this as P0 because I think it's a significant bug, and it's a regression that virtual partitioning introduced near the beginning of 2022.