modin icon indicating copy to clipboard operation
modin copied to clipboard

FEAT-#3535: Implement partition shuffling mechanism and algebra sort_by

Open RehanSD opened this issue 3 years ago • 20 comments

What do these changes do?

  • [x] commit message follows format outlined here
  • [] passes flake8 modin/ asv_bench/benchmarks scripts/doc_checker.py
  • [ ] passes black --check modin/ asv_bench/benchmarks scripts/doc_checker.py
  • [x] signed commit with git commit -s
  • [x] Resolves #3535 ?
  • [ ] tests added and passing
  • [ ] module layout described at docs/development/architecture.rst is up-to-date
  • [ ] added (Issue Number: PR title (PR Number)) and github username to release notes for next major release

RehanSD avatar Jun 24 '22 03:06 RehanSD

This pull request introduces 2 alerts when merging d56da701e8a8f436a3c05f945408a7b0ce847aeb into 2de5c67d7c4a95ab2aac3530484367b1ed1eca94 - view on LGTM.com

new alerts:

  • 2 for Variable defined multiple times

lgtm-com[bot] avatar Jun 27 '22 19:06 lgtm-com[bot]

Codecov Report

Merging #4601 (40dec5a) into master (5b5d7f4) will decrease coverage by 17.42%. The diff coverage is 60.73%.

@@             Coverage Diff             @@
##           master    #4601       +/-   ##
===========================================
- Coverage   84.42%   67.00%   -17.43%     
===========================================
  Files         253      255        +2     
  Lines       19113    19553      +440     
===========================================
- Hits        16137    13101     -3036     
- Misses       2976     6452     +3476     
Impacted Files Coverage Δ
...in/core/dataframe/pandas/partitioning/partition.py 100.00% <ø> (ø)
...entations/pandas_on_dask/partitioning/partition.py 73.73% <30.76%> (-12.31%) :arrow_down:
modin/core/dataframe/pandas/dataframe/utils.py 48.14% <48.14%> (ø)
modin/core/dataframe/pandas/dataframe/dataframe.py 92.43% <53.19%> (-2.81%) :arrow_down:
...odin/core/storage_formats/pandas/query_compiler.py 96.26% <83.33%> (+0.42%) :arrow_up:
...mentations/pandas_on_ray/partitioning/partition.py 90.00% <86.36%> (-0.76%) :arrow_down:
...dataframe/pandas/partitioning/partition_manager.py 89.50% <100.00%> (+2.71%) :arrow_up:
modin/pandas/series.py 94.29% <100.00%> (+0.26%) :arrow_up:
modin/experimental/sklearn/__init__.py 0.00% <0.00%> (-100.00%) :arrow_down:
modin/experimental/xgboost/test/test_dmatrix.py 0.00% <0.00%> (-100.00%) :arrow_down:
... and 100 more

:mega: We’re building smart automated test selection to slash your CI/CD build times. Learn more

codecov[bot] avatar Jun 27 '22 19:06 codecov[bot]

This pull request introduces 2 alerts when merging 2316cd1ea91a5044c44d640b9053b18ab1853a94 into 2de5c67d7c4a95ab2aac3530484367b1ed1eca94 - view on LGTM.com

new alerts:

  • 2 for Variable defined multiple times

lgtm-com[bot] avatar Jun 27 '22 22:06 lgtm-com[bot]

@RehanSD You need to add a release note to this as well.

naren-ponder avatar Jun 28 '22 17:06 naren-ponder

This pull request introduces 2 alerts when merging 8ad85f30b002da5e44e0b6a7cbbd660a413f0b9a into 3982306a540bd09b43a21dcdca6b16b4741ed151 - view on LGTM.com

new alerts:

  • 2 for Variable defined multiple times

lgtm-com[bot] avatar Jun 29 '22 21:06 lgtm-com[bot]

This pull request introduces 2 alerts when merging 9639f4e957cbd46454f5f6ea853837aef8f51b01 into 3982306a540bd09b43a21dcdca6b16b4741ed151 - view on LGTM.com

new alerts:

  • 2 for Variable defined multiple times

lgtm-com[bot] avatar Jun 29 '22 23:06 lgtm-com[bot]

This pull request introduces 2 alerts when merging ef9538cc20fe3984f00d54036fc9f82b41b64075 into 3982306a540bd09b43a21dcdca6b16b4741ed151 - view on LGTM.com

new alerts:

  • 2 for Variable defined multiple times

lgtm-com[bot] avatar Jun 30 '22 20:06 lgtm-com[bot]

This pull request introduces 2 alerts when merging 4dd4393639d10a50c6a95a27595883d446a5e124 into 3982306a540bd09b43a21dcdca6b16b4741ed151 - view on LGTM.com

new alerts:

  • 2 for Variable defined multiple times

lgtm-com[bot] avatar Jun 30 '22 20:06 lgtm-com[bot]

This pull request introduces 2 alerts when merging 00cdfb4106e4ecf701f734a296662388b22a97d7 into 3982306a540bd09b43a21dcdca6b16b4741ed151 - view on LGTM.com

new alerts:

  • 2 for Variable defined multiple times

lgtm-com[bot] avatar Jun 30 '22 21:06 lgtm-com[bot]

This pull request introduces 2 alerts when merging e5d1003f2792216b365c30ace1e0cba6a99a5507 into 35782883efd7e22f8e29a7c9887a4453adabf783 - view on LGTM.com

new alerts:

  • 2 for Variable defined multiple times

lgtm-com[bot] avatar Jul 08 '22 22:07 lgtm-com[bot]

This pull request introduces 1 alert when merging f3b36b657c70a3c0b5c4acbfe33092d31eeeaddf into a7354c9ca76525a265da98f2afe882c53f378840 - view on LGTM.com

new alerts:

  • 1 for Variable defined multiple times

lgtm-com[bot] avatar Jul 12 '22 18:07 lgtm-com[bot]

This pull request introduces 1 alert when merging a497f7c481f2d052eb36b5a73136f5411b6f68ed into a7354c9ca76525a265da98f2afe882c53f378840 - view on LGTM.com

new alerts:

  • 1 for Variable defined multiple times

lgtm-com[bot] avatar Jul 12 '22 20:07 lgtm-com[bot]

@RehanSD, what is the status here? Are there any perf numbers?

YarShev avatar Jul 27 '22 11:07 YarShev

@RehanSD, what is the status here? Are there any perf numbers?

@mvashishtha, if you are going to take over this PR, could you respond to my question?

YarShev avatar Jul 28 '22 12:07 YarShev

@YarShev, @RehanSD is going to fix some correctness bugs on the branch, then do performance testing. AFAIK he doesn't know have any perf numbers yet.

mvashishtha avatar Jul 28 '22 14:07 mvashishtha

@YarShev, @RehanSD is going to fix some correctness bugs on the branch, then do performance testing. AFAIK he doesn't know have any perf numbers yet.

I see, thanks! Look forward to seeing any perf numbers.

YarShev avatar Aug 01 '22 08:08 YarShev

Found a bug with value counts. Occurs when I call df['country_region'].value_counts(). Turns out it happens when the series name is the same as the index.

We stepped through it together and got the following:

'''

In [19]: %time mdf['country_region'].value_counts()

RayTaskError(ValueError) Traceback (most recent call last) File :1, in

File ~/sort_optimizer/lib/python3.9/site-packages/modin/logging/logger_decorator.py:128, in enable_logging..decorator..run_and_log(*args, **kwargs) 113 """ 114 Compute function with logging if Modin logging is enabled. 115 (...) 125 Any 126 """ 127 if LogMode.get() == "disable": --> 128 return obj(*args, **kwargs) 130 logger = get_logger() 131 logger_level = getattr(logger, log_level)

File ~/sort_optimizer/lib/python3.9/site-packages/modin/_compat/pandas_api/latest/series.py:161, in LatestCompatibleSeries.value_counts(self, normalize, sort, ascending, bins, dropna) 158 def value_counts( 159 self, normalize=False, sort=True, ascending=False, bins=None, dropna=True 160 ): # noqa: PR01, RT01, D200 --> 161 return self._value_counts( 162 normalize=normalize, 163 sort=sort, 164 ascending=ascending, 165 bins=bins, 166 dropna=dropna, 167 )

File ~/sort_optimizer/lib/python3.9/site-packages/modin/logging/logger_decorator.py:128, in enable_logging..decorator..run_and_log(*args, **kwargs) 113 """ 114 Compute function with logging if Modin logging is enabled. 115 (...) 125 Any 126 """ 127 if LogMode.get() == "disable": --> 128 return obj(*args, **kwargs) 130 logger = get_logger() 131 logger_level = getattr(logger, log_level)

File ~/sort_optimizer/lib/python3.9/site-packages/modin/pandas/series.py:1980, in Series._value_counts(self, normalize, sort, ascending, bins, dropna) 1968 if bins is not None: 1969 # Potentially we could implement cut function from pandas API, which 1970 # bins values into intervals, and then we can just count them as regular values. 1971 # TODO #1333: new_self = Series(pd.cut(self, bins, include_lowest=True), dtype="interval") 1972 return self._default_to_pandas( 1973 pandas.Series.value_counts, 1974 normalize=normalize, (...) 1978 dropna=dropna, 1979 ) -> 1980 counted_values = super(Series, self)._value_counts( 1981 subset=self, 1982 normalize=normalize, 1983 sort=sort, 1984 ascending=ascending, 1985 dropna=dropna, 1986 ) 1987 # pandas sets output index names to None because the Series name already contains it 1988 counted_values._query_compiler.set_index_name(None)

File ~/sort_optimizer/lib/python3.9/site-packages/modin/logging/logger_decorator.py:128, in enable_logging..decorator..run_and_log(*args, **kwargs) 113 """ 114 Compute function with logging if Modin logging is enabled. 115 (...) 125 Any 126 """ 127 if LogMode.get() == "disable": --> 128 return obj(*args, **kwargs) 130 logger = get_logger() 131 logger_level = getattr(logger, log_level)

File ~/sort_optimizer/lib/python3.9/site-packages/modin/pandas/base.py:2998, in BasePandasDataset._value_counts(self, subset, normalize, sort, ascending, dropna) 2996 counted_values = self.groupby(by=subset, dropna=dropna, observed=True).size() 2997 if sort: -> 2998 counted_values.sort_values(ascending=ascending, inplace=True) 2999 if normalize: 3000 counted_values = counted_values / counted_values.sum()

File ~/sort_optimizer/lib/python3.9/site-packages/modin/logging/logger_decorator.py:128, in enable_logging..decorator..run_and_log(*args, **kwargs) 113 """ 114 Compute function with logging if Modin logging is enabled. 115 (...) 125 Any 126 """ 127 if LogMode.get() == "disable": --> 128 return obj(*args, **kwargs) 130 logger = get_logger() 131 logger_level = getattr(logger, log_level)

File ~/sort_optimizer/lib/python3.9/site-packages/modin/pandas/series.py:1714, in Series.sort_values(self, axis, ascending, inplace, kind, na_position, ignore_index, key) 1710 # When we convert to a DataFrame, the name is automatically converted to 0 if it 1711 # is None, so we do this to avoid a KeyError. 1712 by = self.name if self.name is not None else 0 1713 result = ( -> 1714 DataFrame(self.copy()) 1715 .sort_values( 1716 by=by, 1717 ascending=ascending, 1718 inplace=False, 1719 kind=kind, 1720 na_position=na_position, 1721 ignore_index=ignore_index, 1722 key=key, 1723 ) 1724 .squeeze(axis=1) 1725 ) 1726 result.name = self.name 1727 return self._create_or_update_from_compiler( 1728 result._query_compiler, inplace=inplace 1729 )

File ~/sort_optimizer/lib/python3.9/site-packages/modin/logging/logger_decorator.py:128, in enable_logging..decorator..run_and_log(*args, **kwargs) 113 """ 114 Compute function with logging if Modin logging is enabled. 115 (...) 125 Any 126 """ 127 if LogMode.get() == "disable": --> 128 return obj(*args, **kwargs) 130 logger = get_logger() 131 logger_level = getattr(logger, log_level)

File ~/sort_optimizer/lib/python3.9/site-packages/modin/pandas/base.py:2660, in BasePandasDataset.sort_values(self, by, axis, ascending, inplace, kind, na_position, ignore_index, key) 2658 ascending = self._validate_ascending(ascending) 2659 if axis == 0: -> 2660 result = self._query_compiler.sort_rows_by_column_values( 2661 by, 2662 ascending=ascending, 2663 kind=kind, 2664 na_position=na_position, 2665 ignore_index=ignore_index, 2666 key=key, 2667 ) 2668 else: 2669 result = self._query_compiler.sort_columns_by_row_values( 2670 by, 2671 ascending=ascending, (...) 2675 key=key, 2676 )

File ~/sort_optimizer/lib/python3.9/site-packages/modin/logging/logger_decorator.py:128, in enable_logging..decorator..run_and_log(*args, **kwargs) 113 """ 114 Compute function with logging if Modin logging is enabled. 115 (...) 125 Any 126 """ 127 if LogMode.get() == "disable": --> 128 return obj(*args, **kwargs) 130 logger = get_logger() 131 logger_level = getattr(logger, log_level)

File ~/sort_optimizer/lib/python3.9/site-packages/modin/core/storage_formats/pandas/query_compiler.py:3181, in PandasQueryCompiler.sort_rows_by_column_values(self, columns, ascending, **kwargs) 3179 if Engine.get() in ["Ray", "Dask"]: 3180 kwargs.pop("ascending", False) -> 3181 new_modin_frame = self._modin_frame.sort_by( 3182 0, columns, ascending=ascending, **kwargs 3183 ) 3184 return self.constructor(new_modin_frame) 3185 ignore_index = kwargs.get("ignore_index", False)

File ~/sort_optimizer/lib/python3.9/site-packages/modin/logging/logger_decorator.py:128, in enable_logging..decorator..run_and_log(*args, **kwargs) 113 """ 114 Compute function with logging if Modin logging is enabled. 115 (...) 125 Any 126 """ 127 if LogMode.get() == "disable": --> 128 return obj(*args, **kwargs) 130 logger = get_logger() 131 logger_level = getattr(logger, log_level)

File ~/sort_optimizer/lib/python3.9/site-packages/modin/core/dataframe/pandas/dataframe/dataframe.py:121, in lazy_metadata_decorator..decorator..run_f_on_minimally_updated_metadata(self, *args, **kwargs) 119 elif apply_axis == "rows": 120 obj._propagate_index_objs(axis=0) --> 121 result = f(self, *args, **kwargs) 122 if apply_axis is None and not transpose: 123 result._deferred_index = self._deferred_index

File ~/sort_optimizer/lib/python3.9/site-packages/modin/core/dataframe/pandas/dataframe/dataframe.py:1833, in PandasDataframe.sort_by(self, axis, columns, ascending, **kwargs) 1831 new_axes[axis.value] = RangeIndex(len(new_axes[axis.value])) 1832 else: -> 1833 new_axes[axis.value] = self._compute_axis_labels_and_lengths( 1834 axis.value, new_partitions 1835 ) 1836 if isinstance(self.axes[axis.value], pandas.MultiIndex): 1837 new_axes[axis.value] = pandas.MultiIndex.from_tuples( 1838 new_axes[axis.value].values 1839 )

File ~/sort_optimizer/lib/python3.9/site-packages/modin/logging/logger_decorator.py:128, in enable_logging..decorator..run_and_log(*args, **kwargs) 113 """ 114 Compute function with logging if Modin logging is enabled. 115 (...) 125 Any 126 """ 127 if LogMode.get() == "disable": --> 128 return obj(*args, **kwargs) 130 logger = get_logger() 131 logger_level = getattr(logger, log_level)

File ~/sort_optimizer/lib/python3.9/site-packages/modin/core/dataframe/pandas/dataframe/dataframe.py:462, in PandasDataframe._compute_axis_labels_and_lengths(self, axis, partitions) 460 if partitions is None: 461 partitions = self._partitions --> 462 new_index, internal_idx = self._partition_mgr_cls.get_indices(axis, partitions) 463 return new_index, list(map(len, internal_idx))

File ~/sort_optimizer/lib/python3.9/site-packages/modin/logging/logger_decorator.py:128, in enable_logging..decorator..run_and_log(*args, **kwargs) 113 """ 114 Compute function with logging if Modin logging is enabled. 115 (...) 125 Any 126 """ 127 if LogMode.get() == "disable": --> 128 return obj(*args, **kwargs) 130 logger = get_logger() 131 logger_level = getattr(logger, log_level)

File ~/sort_optimizer/lib/python3.9/site-packages/modin/core/dataframe/pandas/partitioning/partition_manager.py:864, in PandasDataframePartitionManager.get_indices(cls, axis, partitions, index_func) 862 target = partitions.T if axis == 0 else partitions 863 new_idx = [idx.apply(func) for idx in target[0]] if len(target) else [] --> 864 new_idx = cls.get_objects_from_partitions(new_idx) 865 # TODO FIX INFORMATION LEAK!!!!1!!1!! 866 total_idx = new_idx[0].append(new_idx[1:]) if new_idx else new_idx

File ~/sort_optimizer/lib/python3.9/site-packages/modin/logging/logger_decorator.py:128, in enable_logging..decorator..run_and_log(*args, **kwargs) 113 """ 114 Compute function with logging if Modin logging is enabled. 115 (...) 125 Any 126 """ 127 if LogMode.get() == "disable": --> 128 return obj(*args, **kwargs) 130 logger = get_logger() 131 logger_level = getattr(logger, log_level)

File ~/sort_optimizer/lib/python3.9/site-packages/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition_manager.py:110, in PandasOnRayDataframePartitionManager.get_objects_from_partitions(cls, partitions) 95 @classmethod 96 def get_objects_from_partitions(cls, partitions): 97 """ 98 Get the objects wrapped by partitions in parallel. 99 (...) 108 The objects wrapped by partitions. 109 """ --> 110 return ray.get([partition._data for partition in partitions])

File ~/sort_optimizer/lib/python3.9/site-packages/ray/_private/client_mode_hook.py:105, in client_mode_hook..wrapper(*args, **kwargs) 103 if func.name != "init" or is_client_mode_enabled_by_default: 104 return getattr(ray, func.name)(*args, **kwargs) --> 105 return func(*args, **kwargs)

File ~/sort_optimizer/lib/python3.9/site-packages/ray/worker.py:1831, in get(object_refs, timeout) 1829 worker.core_worker.dump_object_store_memory_usage() 1830 if isinstance(value, RayTaskError): -> 1831 raise value.as_instanceof_cause() 1832 else: 1833 raise value

RayTaskError(ValueError): ray::_apply_list_of_funcs() (pid=24810, ip=172.31.25.244) File "/home/ubuntu/sort_optimizer/lib/python3.9/site-packages/modin/core/dataframe/pandas/dataframe/dataframe.py", line 1827, in lambda df: df.sort_values(by=columns, ascending=ascending, **kwargs), File "/home/ubuntu/sort_optimizer/lib/python3.9/site-packages/pandas/util/_decorators.py", line 311, in wrapper return func(*args, **kwargs) File "/home/ubuntu/sort_optimizer/lib/python3.9/site-packages/pandas/core/frame.py", line 6319, in sort_values k = self._get_label_or_level_values(by, axis=axis) File "/home/ubuntu/sort_optimizer/lib/python3.9/site-packages/pandas/core/generic.py", line 1835, in _get_label_or_level_values self._check_label_or_level_ambiguity(key, axis=axis) File "/home/ubuntu/sort_optimizer/lib/python3.9/site-packages/pandas/core/generic.py", line 1794, in _check_label_or_level_ambiguity raise ValueError(msg) ValueError: 'country_region' is both an index level and a column label, which is ambiguous.

During handling of the above exception, another exception occurred:

ray::_apply_list_of_funcs() (pid=24810, ip=172.31.25.244) File "/home/ubuntu/sort_optimizer/lib/python3.9/site-packages/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py", line 485, in _apply_list_of_funcs partition = func(partition.copy(), *args, **kwargs) File "/home/ubuntu/sort_optimizer/lib/python3.9/site-packages/modin/core/dataframe/pandas/dataframe/dataframe.py", line 1827, in lambda df: df.sort_values(by=columns, ascending=ascending, **kwargs), File "/home/ubuntu/sort_optimizer/lib/python3.9/site-packages/pandas/util/_decorators.py", line 311, in wrapper return func(*args, **kwargs) File "/home/ubuntu/sort_optimizer/lib/python3.9/site-packages/pandas/core/frame.py", line 6319, in sort_values k = self._get_label_or_level_values(by, axis=axis) File "/home/ubuntu/sort_optimizer/lib/python3.9/site-packages/pandas/core/generic.py", line 1835, in _get_label_or_level_values self._check_label_or_level_ambiguity(key, axis=axis) File "/home/ubuntu/sort_optimizer/lib/python3.9/site-packages/pandas/core/generic.py", line 1794, in _check_label_or_level_ambiguity raise ValueError(msg) ValueError: 'country_region' is both an index level and a column label, which is ambiguous.

In [20]: 2022-08-15 23:09:36,898 ERROR worker.py:94 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): ray::_apply_list_of_funcs() (pid=24812, ip=172.31.25.244) File "/home/ubuntu/sort_optimizer/lib/python3.9/site-packages/modin/core/dataframe/pandas/dataframe/dataframe.py", line 1827, in lambda df: df.sort_values(by=columns, ascending=ascending, **kwargs), File "/home/ubuntu/sort_optimizer/lib/python3.9/site-packages/pandas/util/_decorators.py", line 311, in wrapper return func(*args, **kwargs) File "/home/ubuntu/sort_optimizer/lib/python3.9/site-packages/pandas/core/frame.py", line 6319, in sort_values k = self._get_label_or_level_values(by, axis=axis) File "/home/ubuntu/sort_optimizer/lib/python3.9/site-packages/pandas/core/generic.py", line 1835, in _get_label_or_level_values self._check_label_or_level_ambiguity(key, axis=axis) File "/home/ubuntu/sort_optimizer/lib/python3.9/site-packages/pandas/core/generic.py", line 1794, in _check_label_or_level_ambiguity raise ValueError(msg) ValueError: 'country_region' is both an index level and a column label, which is ambiguous.

During handling of the above exception, another exception occurred:

ray::_apply_list_of_funcs() (pid=24812, ip=172.31.25.244) File "/home/ubuntu/sort_optimizer/lib/python3.9/site-packages/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py", line 485, in _apply_list_of_funcs partition = func(partition.copy(), *args, **kwargs) File "/home/ubuntu/sort_optimizer/lib/python3.9/site-packages/modin/core/dataframe/pandas/dataframe/dataframe.py", line 1827, in lambda df: df.sort_values(by=columns, ascending=ascending, **kwargs), File "/home/ubuntu/sort_optimizer/lib/python3.9/site-packages/pandas/util/_decorators.py", line 311, in wrapper return func(*args, **kwargs) File "/home/ubuntu/sort_optimizer/lib/python3.9/site-packages/pandas/core/frame.py", line 6319, in sort_values k = self._get_label_or_level_values(by, axis=axis) File "/home/ubuntu/sort_optimizer/lib/python3.9/site-packages/pandas/core/generic.py", line 1835, in _get_label_or_level_values self._check_label_or_level_ambiguity(key, axis=axis) File "/home/ubuntu/sort_optimizer/lib/python3.9/site-packages/pandas/core/generic.py", line 1794, in _check_label_or_level_ambiguity raise ValueError(msg) ValueError: 'country_region' is both an index level and a column label, which is ambiguous. 2022-08-15 23:09:36,898 ERROR worker.py:94 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): ray::_apply_list_of_funcs() (pid=24812, ip=172.31.25.244) File "/home/ubuntu/sort_optimizer/lib/python3.9/site-packages/modin/core/dataframe/pandas/dataframe/dataframe.py", line 1827, in lambda df: df.sort_values(by=columns, ascending=ascending, **kwargs), File "/home/ubuntu/sort_optimizer/lib/python3.9/site-packages/pandas/util/_decorators.py", line 311, in wrapper return func(*args, **kwargs) File "/home/ubuntu/sort_optimizer/lib/python3.9/site-packages/pandas/core/frame.py", line 6319, in sort_values k = self._get_label_or_level_values(by, axis=axis) File "/home/ubuntu/sort_optimizer/lib/python3.9/site-packages/pandas/core/generic.py", line 1835, in _get_label_or_level_values self._check_label_or_level_ambiguity(key, axis=axis) File "/home/ubuntu/sort_optimizer/lib/python3.9/site-packages/pandas/core/generic.py", line 1794, in _check_label_or_level_ambiguity raise ValueError(msg) ValueError: 'country_region' is both an index level and a column label, which is ambiguous.

During handling of the above exception, another exception occurred:

ray::_apply_list_of_funcs() (pid=24812, ip=172.31.25.244) File "/home/ubuntu/sort_optimizer/lib/python3.9/site-packages/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py", line 485, in _apply_list_of_funcs partition = func(partition.copy(), *args, **kwargs) File "/home/ubuntu/sort_optimizer/lib/python3.9/site-packages/modin/core/dataframe/pandas/dataframe/dataframe.py", line 1827, in lambda df: df.sort_values(by=columns, ascending=ascending, **kwargs), File "/home/ubuntu/sort_optimizer/lib/python3.9/site-packages/pandas/util/_decorators.py", line 311, in wrapper return func(*args, **kwargs) File "/home/ubuntu/sort_optimizer/lib/python3.9/site-packages/pandas/core/frame.py", line 6319, in sort_values k = self._get_label_or_level_values(by, axis=axis) File "/home/ubuntu/sort_optimizer/lib/python3.9/site-packages/pandas/core/generic.py", line 1835, in _get_label_or_level_values self._check_label_or_level_ambiguity(key, axis=axis) File "/home/ubuntu/sort_optimizer/lib/python3.9/site-packages/pandas/core/generic.py", line 1794, in _check_label_or_level_ambiguity raise ValueError(msg) ValueError: 'country_region' is both an index level and a column label, which is ambiguous.



alejandro-ponder avatar Aug 15 '22 23:08 alejandro-ponder

Is there a chance to put this in 0.16?

YarShev avatar Aug 26 '22 08:08 YarShev

Is there a chance to put this in 0.16?

I don't think so. The latest target date is September 9.

mvashishtha avatar Aug 30 '22 17:08 mvashishtha

Any new ETA?

Garra1980 avatar Sep 16 '22 21:09 Garra1980

Hi folks! The sort_by is nearly complete - we're just tracking down the last few CI bugs, and hope to get this in by end of week!

One open question is how to handle compat mode. When sorting by a column that contains objects, we need to specify a method argument to numpy to ensure that numpy picks a whole object, rather than trying to find the mean between the two objects (which may not support addition or division). The problem is that numpy only supports this for versions 1.23 up, so our current sort can't work on object columns in compat mode. Should we write a compat mode for the parts that need it, and pass down compatibility info to the Modin Frame layer, or should we write the compat mode, and then pick between compat and non-compat modes under the hood by checking the python version? We also have some arguments that we pass through to the underlying sort_values that we won't be able to pass through, since they don't exist in the compat mode.

RehanSD avatar Sep 27 '22 01:09 RehanSD

I did some rough benchmarking, and noticed that the new sort is sensitive to skew in the data (each of the tests except the one for the sort algo below was only done with 1 trial - will do more robust testing after the algo is improved!)

pandas with a uniform distribution (5,000,000 x 100, uniform dist [0, 5_000_000)): 15.901382000000012 Modin Main Branch with a uniform distribution (5,000,000 x 100, uniform dist [0, 5_000_000)): 40.345454515 seconds New Sort Branch with a uniform distribution: 13.770275069000036, 13.965218987000071 seconds New Sort Branch with a Cauchy distribution (same size of data): 53.401249114999985

I took a look at the sizes of the partitions (that we do the sort on) for each of the cases - for the uniform distribution case, we have a fairly even spread, although one partition is too big: (_concat_splits pid=3537) 0 (_concat_splits pid=3536) 550956 (_concat_splits pid=3538) 547561 (_concat_splits pid=3539) 547355 (_concat_splits pid=3540) 543955 (_concat_splits pid=3534) 552042 (_concat_splits pid=3535) 541358 (_concat_splits pid=3541) 1716773 While for the Cauchy distribution, we have a similar spread, but two partitions are too big: (_concat_splits pid=4788) 2891 (_concat_splits pid=4789) 0 (_concat_splits pid=4790) 573087 (_concat_splits pid=4783) 581212 (_concat_splits pid=4785) 568910 (_concat_splits pid=4787) 1016174 (_concat_splits pid=4786) 570652 (_concat_splits pid=4784) 1687074

I timed other aspects of the algorithm - the sampling on each partition and the time to compute an overall sample - both of which are nearly identical across both Dataframes. Do folks have any suggestions on how I can skew-proof the sampling stage, or adjust skew in the splitting stage? I'm thinking potentially sampling from minor columns as well could help, although it might be tricky to know when or how to use it - i.e. we really want a sample of the minor key for a specific range of the major key, so we could break up that range into multiple partitions, which is hard to get. Additionally, this still leaves us open to poor performance when sorting only by one column that has a ton of skew. Would appreciate folk's thoughts!

RehanSD avatar Sep 27 '22 09:09 RehanSD

These experiments were run on an m5.2xlarge with 512 GiB of memory. The following script was used:

from time import perf_counter
import ray
import numpy as np

from modin.config import IsRayCluster, BenchmarkMode
IsRayCluster.put(True)
BenchmarkMode.put(True)
ray.init(object_store_memory=1.39586e10)

import modin.pandas as pd
# import pandas as pd

# df = pd.DataFrame(np.random.standard_cauchy(size=(5_000_000, 100)), columns=[f"col {i}" for i in range(100)])
df = pd.DataFrame(np.random.uniform(size=(5_000_000, 100), high=5_000_000), columns=[f"col {i}" for i in range(100)])

start = perf_counter()
df.sort_values(by=['col 0', 'col 1', 'col 2'])
end = perf_counter()
print(f"{end - start}")

RehanSD avatar Sep 27 '22 18:09 RehanSD

@RehanSD thank you! Some more questions:

  • How do pandas and the existing modin implementation do on the Cauchy data?
  • How is the performance for all the scenarios you've given if we sort by just one column? (I'm guessing this is probably a fairly common use case)

mvashishtha avatar Sep 27 '22 19:09 mvashishtha

One quick update is I noticed that the partition skews were actually due to two things - how we sample, as well as the fact that I was setting the uppermost and lowermost quantiles to inf and -inf respectively - which meant that in practice, the topmost and bottommost partitions were actually two partitions combined. I've fixed that bug, and the time for standard_cauchy with our new sort is 11.446099726000057 seconds, which is on par with performance on the uniform data. Partition skew is still present in this regime; however, the partitions for the standard Cauchy with this new code looks like this:

(_concat_splits pid=6728) 1207
(_concat_splits pid=6727) 573908
(_concat_splits pid=6726) 571120
(_concat_splits pid=6733) 578198
(_concat_splits pid=6731) 566099
(_concat_splits pid=6732) 581331
(_concat_splits pid=6730) 1018288
(_concat_splits pid=6729) 1109845

I will add that there seems to be some stochasticity to the performance when measuring this branch - sometimes the timing will be 11 seconds, and sometimes it can jump up to 50, but I'm fairly certain that this is due to ray not tearing down properly, since it always goes to 11 if I just run ray stop -f a couple of times, and wait a few minutes before rerunning.

Still working on the sample/split code - will post the benchmarks @mvashishtha asked for once I finish!

RehanSD avatar Sep 29 '22 02:09 RehanSD

@RehanSD it's great to hear that Modin is doing about equally well on the Cauchy data. Since the new Modin performance on Cauchy data is not very different from the performance on uniform data, it seems very unlikely that Modin is going to do much worse than pandas on the Cauchy data.

Since at least for your benchmark, the new implementation's performance on uniform data is about on par with pandas and ~3x better than what we have now, I think we should prioritize making the current implementation readable and merging it. I think we should fine tune the performance later

mvashishtha avatar Oct 03 '22 16:10 mvashishtha

On a 5_000_000 million row x 100 col df, we get the following times:

pandas Modin Master Modin This Branch
16.07 ± 0.0747 s 40.36 ± 0.1191 s 9.71 ± 0.0799 s

This is on an M5.2XLarge, with a 32GB /dev/shm, and 30.02GB for Ray's object store.

Modin Bench script:

from time import perf_counter
import ray
import numpy as np

from modin.config import IsRayCluster, BenchmarkMode
IsRayCluster.put(True)
BenchmarkMode.put(True)
ray.init(object_store_memory=2.79586e10)

import modin.pandas as pd
# import pandas as pd

df = pd.DataFrame(np.random.standard_cauchy(size=(5_000_000, 100)), columns=[f"col {i}" for i in range(100)])
# df = pd.DataFrame(np.random.uniform(size=(5_000_000, 100), high=5_000_000), columns=[f"col {i}" for i in range(100)])

start = perf_counter()
df.sort_values(by=['col 0', 'col 1', 'col 2'])
end = perf_counter()
print(f"{end - start}")

pandas Bench script:

from time import perf_counter
import numpy as np

import pandas as pd

df = pd.DataFrame(np.random.standard_cauchy(size=(5_000_000, 100)), columns=[f"col {i}" for i in range(100)])

start = perf_counter()
df.sort_values(by=['col 0', 'col 1', 'col 2'])
end = perf_counter()
print(f"{end - start}")

RehanSD avatar Nov 30 '22 01:11 RehanSD