modin icon indicating copy to clipboard operation
modin copied to clipboard

PERF-#4268: Implement partition-parallel __getitem__ for bool Series masks

Open vnlitvinov opened this issue 3 years ago • 6 comments

What do these changes do?

When running df[mask] query for mask being a series with bool data type, this PR implements running the query in partition-parallel way.

Also sneak-plugged a small optimization for running a binary operator when right side (which is an axis-wide partition) has only one block - no point of concatenating it.

@modin-project/modin-core please note that I have changed the Binary signatures a bit.

  • [x] commit message follows format outlined here
  • [x] passes flake8 modin/ asv_bench/benchmarks scripts/doc_checker.py
  • [x] passes black --check modin/ asv_bench/benchmarks scripts/doc_checker.py
  • [x] signed commit with git commit -s
  • [x] Resolves #4268
  • [x] tests added and passing
  • [x] module layout described at docs/development/architecture.rst is up-to-date
  • [x] added (Issue Number: PR title (PR Number)) and github username to release notes for next major release

vnlitvinov avatar Aug 02 '22 10:08 vnlitvinov

Codecov Report

Merging #4753 (97f542b) into master (6e1849f) will increase coverage by 4.59%. The diff coverage is 91.66%.

@@            Coverage Diff             @@
##           master    #4753      +/-   ##
==========================================
+ Coverage   85.28%   89.88%   +4.59%     
==========================================
  Files         259      260       +1     
  Lines       19407    19697     +290     
==========================================
+ Hits        16552    17705    +1153     
+ Misses       2855     1992     -863     
Impacted Files Coverage Δ
...odin/core/storage_formats/pandas/query_compiler.py 96.02% <80.00%> (-0.07%) :arrow_down:
modin/core/dataframe/algebra/binary.py 100.00% <100.00%> (ø)
modin/core/dataframe/pandas/dataframe/dataframe.py 94.84% <100.00%> (+0.50%) :arrow_up:
...dataframe/pandas/partitioning/partition_manager.py 90.15% <100.00%> (+3.69%) :arrow_up:
modin/logging/config.py 94.59% <0.00%> (-1.30%) :arrow_down:
modin/experimental/batch/test/test_pipeline.py 100.00% <0.00%> (ø)
modin/pandas/series.py 94.23% <0.00%> (+0.24%) :arrow_up:
modin/pandas/series_utils.py 99.43% <0.00%> (+0.56%) :arrow_up:
... and 38 more

:mega: We’re building smart automated test selection to slash your CI/CD build times. Learn more

codecov[bot] avatar Aug 02 '22 11:08 codecov[bot]

@modin-project/modin-core technically this PR has two changesets, one is changing the Binary and broadcast_apply signatures (to allow dropping labels instead of keeping or redefining), and another is building the mask on top of it. Should I split this PR in two?

vnlitvinov avatar Aug 02 '22 12:08 vnlitvinov

@pyrito I sure do have the numbers. Here's the script I use:

import sys
from time import time as _timer
from contextlib import contextmanager

@contextmanager
def timer(name):
    start = _timer()
    try:
        yield
    finally:
        print(f'{name} took {_timer() - start:.3f} sec')

nrows = 25_000_000
#nrows = 1_000_000
ncols = 10
threshold = 0.3

def _run_queries(mod):
    import numpy as np

    with timer('create target df'):
        data = {f"col{i}": np.random.rand(nrows) for i in range(ncols)}
        df = mod.DataFrame(data)
        repr(df)
    with timer('df[condition1]'):
        df2 = df[df.col1 < threshold]
        repr(df2)
    with timer('df[condition2]'):
        df3 = df[df.col2 < threshold]
        repr(df3)

def try_modin():
    import modin.config as cfg
    import modin.pandas as pd

    with timer('start workers'):
        pd.DataFrame(range(cfg.CpuCount.get() * cfg.MinPartitionSize().get())).to_numpy() # init the engine and start all the workers


    _run_queries(pd)

def try_pandas():
    import pandas as pd
    _run_queries(pd)

if __name__ == '__main__':
    try:
        func_name, = sys.argv[1:]
        func = globals()[func_name]
        assert callable(func)
    except (ValueError, KeyError, AssertionError) as err:
        sys.exit(f'Usage: {sys.argv[0]} func-to-run, error: {err}')
    print(f'Shape: rows={nrows / 1e6:.1f}M x {ncols=}')
    func()

The issue with getting the exact numbers right now is I don't have enough memory on my current laptop to measure >=50M rows without stuff going to swap all the time, so I cannot show the speedup 🙃 lower numbers don't give enough difference (original figures in the issue I'm addressing were erroneously accounting for some pieces of engine initialization and data distribution as parts of __getitem__, so the difference is not actually that big).

Here's what I have for 25M rows (using 6 cores):

script part pandas modin master this PR
create dataframe 1.601 2.666 2.462
df[condition1] 0.450 1.297 1.097
df[condition2] 0.683 0.763 0.616

The numbers fluctuate a lot because my laptop is not set for stable benchmarking, but the general figure is the same - this PR is slightly faster than master (or same) on <50M and then goes a bit better.

vnlitvinov avatar Aug 02 '22 17:08 vnlitvinov

@pyrito I sure do have the numbers. Here's the script I use: ... The issue with getting the exact numbers right now is I don't have enough memory on my current laptop to measure >=50M rows without stuff going to swap all the time, so I cannot show the speedup 🙃

I've checked this script of my laptop with nrows=75m. The result are below:

This PR #4753:

(modin) C:\Users\79049\Documents\projects\modin>python reproducers\vas_test.py try_modin
Shape: rows=75.0M x ncols=10
UserWarning: Ray execution environment not yet initialized. Initializing...
To remove this warning, run the following python code before doing dataframe operations:

    import ray
    ray.init(runtime_env={'env_vars': {'__MODIN_AUTOIMPORT_PANDAS__': '1'}})

UserWarning: Distributing <class 'range'> object. This may take some time.
start workers took 9.099 sec
UserWarning: Distributing <class 'dict'> object. This may take some time.
create target df took 7.584 sec
df[condition1] took 1.618 sec
df[condition2] took 1.247 sec

Master:

start workers took 9.214 sec
UserWarning: Distributing <class 'dict'> object. This may take some time.
create target df took 7.796 sec
df[condition1] took 2.341 sec
df[condition2] took 1.833 sec

Pandas:

(modin) C:\Users\79049\Documents\projects\modin>python reproducers\vas_test.py try_pandas
Shape: rows=75.0M x ncols=10
create target df took 5.201 sec
df[condition1] took 1.021 sec
df[condition2] took 1.025 sec

My hardware: OS: Windows11 21H2, CPU: AMD Ryzen 7 5800HS 3.2 GHz 8 cores (16 threads) RAM: 32 GB DDR4 3200 MHz

prutskov avatar Aug 02 '22 17:08 prutskov

Let me also test the performance for large dataframes before merging.

UPD added results:

Shape: rows=150.0M x ncols=10

script part pandas modin master 16 cores this PR 16 cores
create dataframe 17.520 27.409 27.694
df[condition1] 3.971 4.745 2.068
df[condition2] 3.550 4.562 1.846

anmyachev avatar Aug 03 '22 10:08 anmyachev

Yay, on super-huge datasets it is even faster than pandas! 🥳 (given my analysis that we produce ~7x more memory traffic even with my approach vs pandas this is awesome news!)

vnlitvinov avatar Aug 03 '22 12:08 vnlitvinov