modin
modin copied to clipboard
PERF-#4268: Implement partition-parallel __getitem__ for bool Series masks
What do these changes do?
When running df[mask] query for mask being a series with bool data type, this PR implements running the query in partition-parallel way.
Also sneak-plugged a small optimization for running a binary operator when right side (which is an axis-wide partition) has only one block - no point of concatenating it.
@modin-project/modin-core please note that I have changed the Binary signatures a bit.
- [x] commit message follows format outlined here
- [x] passes
flake8 modin/ asv_bench/benchmarks scripts/doc_checker.py - [x] passes
black --check modin/ asv_bench/benchmarks scripts/doc_checker.py - [x] signed commit with
git commit -s - [x] Resolves #4268
- [x] tests added and passing
- [x] module layout described at
docs/development/architecture.rstis up-to-date - [x] added (Issue Number: PR title (PR Number)) and github username to release notes for next major release
Codecov Report
Merging #4753 (97f542b) into master (6e1849f) will increase coverage by
4.59%. The diff coverage is91.66%.
@@ Coverage Diff @@
## master #4753 +/- ##
==========================================
+ Coverage 85.28% 89.88% +4.59%
==========================================
Files 259 260 +1
Lines 19407 19697 +290
==========================================
+ Hits 16552 17705 +1153
+ Misses 2855 1992 -863
| Impacted Files | Coverage Δ | |
|---|---|---|
| ...odin/core/storage_formats/pandas/query_compiler.py | 96.02% <80.00%> (-0.07%) |
:arrow_down: |
| modin/core/dataframe/algebra/binary.py | 100.00% <100.00%> (ø) |
|
| modin/core/dataframe/pandas/dataframe/dataframe.py | 94.84% <100.00%> (+0.50%) |
:arrow_up: |
| ...dataframe/pandas/partitioning/partition_manager.py | 90.15% <100.00%> (+3.69%) |
:arrow_up: |
| modin/logging/config.py | 94.59% <0.00%> (-1.30%) |
:arrow_down: |
| modin/experimental/batch/test/test_pipeline.py | 100.00% <0.00%> (ø) |
|
| modin/pandas/series.py | 94.23% <0.00%> (+0.24%) |
:arrow_up: |
| modin/pandas/series_utils.py | 99.43% <0.00%> (+0.56%) |
:arrow_up: |
| ... and 38 more |
:mega: We’re building smart automated test selection to slash your CI/CD build times. Learn more
@modin-project/modin-core technically this PR has two changesets, one is changing the Binary and broadcast_apply signatures (to allow dropping labels instead of keeping or redefining), and another is building the mask on top of it. Should I split this PR in two?
@pyrito I sure do have the numbers. Here's the script I use:
import sys
from time import time as _timer
from contextlib import contextmanager
@contextmanager
def timer(name):
start = _timer()
try:
yield
finally:
print(f'{name} took {_timer() - start:.3f} sec')
nrows = 25_000_000
#nrows = 1_000_000
ncols = 10
threshold = 0.3
def _run_queries(mod):
import numpy as np
with timer('create target df'):
data = {f"col{i}": np.random.rand(nrows) for i in range(ncols)}
df = mod.DataFrame(data)
repr(df)
with timer('df[condition1]'):
df2 = df[df.col1 < threshold]
repr(df2)
with timer('df[condition2]'):
df3 = df[df.col2 < threshold]
repr(df3)
def try_modin():
import modin.config as cfg
import modin.pandas as pd
with timer('start workers'):
pd.DataFrame(range(cfg.CpuCount.get() * cfg.MinPartitionSize().get())).to_numpy() # init the engine and start all the workers
_run_queries(pd)
def try_pandas():
import pandas as pd
_run_queries(pd)
if __name__ == '__main__':
try:
func_name, = sys.argv[1:]
func = globals()[func_name]
assert callable(func)
except (ValueError, KeyError, AssertionError) as err:
sys.exit(f'Usage: {sys.argv[0]} func-to-run, error: {err}')
print(f'Shape: rows={nrows / 1e6:.1f}M x {ncols=}')
func()
The issue with getting the exact numbers right now is I don't have enough memory on my current laptop to measure >=50M rows without stuff going to swap all the time, so I cannot show the speedup 🙃 lower numbers don't give enough difference (original figures in the issue I'm addressing were erroneously accounting for some pieces of engine initialization and data distribution as parts of __getitem__, so the difference is not actually that big).
Here's what I have for 25M rows (using 6 cores):
| script part | pandas | modin master | this PR |
|---|---|---|---|
| create dataframe | 1.601 | 2.666 | 2.462 |
| df[condition1] | 0.450 | 1.297 | 1.097 |
| df[condition2] | 0.683 | 0.763 | 0.616 |
The numbers fluctuate a lot because my laptop is not set for stable benchmarking, but the general figure is the same - this PR is slightly faster than master (or same) on <50M and then goes a bit better.
@pyrito I sure do have the numbers. Here's the script I use: ... The issue with getting the exact numbers right now is I don't have enough memory on my current laptop to measure >=50M rows without stuff going to swap all the time, so I cannot show the speedup 🙃
I've checked this script of my laptop with nrows=75m. The result are below:
This PR #4753:
(modin) C:\Users\79049\Documents\projects\modin>python reproducers\vas_test.py try_modin
Shape: rows=75.0M x ncols=10
UserWarning: Ray execution environment not yet initialized. Initializing...
To remove this warning, run the following python code before doing dataframe operations:
import ray
ray.init(runtime_env={'env_vars': {'__MODIN_AUTOIMPORT_PANDAS__': '1'}})
UserWarning: Distributing <class 'range'> object. This may take some time.
start workers took 9.099 sec
UserWarning: Distributing <class 'dict'> object. This may take some time.
create target df took 7.584 sec
df[condition1] took 1.618 sec
df[condition2] took 1.247 sec
Master:
start workers took 9.214 sec
UserWarning: Distributing <class 'dict'> object. This may take some time.
create target df took 7.796 sec
df[condition1] took 2.341 sec
df[condition2] took 1.833 sec
Pandas:
(modin) C:\Users\79049\Documents\projects\modin>python reproducers\vas_test.py try_pandas
Shape: rows=75.0M x ncols=10
create target df took 5.201 sec
df[condition1] took 1.021 sec
df[condition2] took 1.025 sec
My hardware: OS: Windows11 21H2, CPU: AMD Ryzen 7 5800HS 3.2 GHz 8 cores (16 threads) RAM: 32 GB DDR4 3200 MHz
Let me also test the performance for large dataframes before merging.
UPD added results:
Shape: rows=150.0M x ncols=10
| script part | pandas | modin master 16 cores | this PR 16 cores |
|---|---|---|---|
| create dataframe | 17.520 | 27.409 | 27.694 |
| df[condition1] | 3.971 | 4.745 | 2.068 |
| df[condition2] | 3.550 | 4.562 | 1.846 |
Yay, on super-huge datasets it is even faster than pandas! 🥳 (given my analysis that we produce ~7x more memory traffic even with my approach vs pandas this is awesome news!)