modin icon indicating copy to clipboard operation
modin copied to clipboard

Modin iloc performance is not good enough

Open amyskov opened this issue 4 years ago • 5 comments

System information

  • OS Platform and Distribution (e.g., Linux Ubuntu 16.04): Linux Ubuntu 19.04
  • Modin version (modin.__version__): 0.7.3+290.gaa78a18
  • Python version: 3.8.3
  • Code we can use to reproduce:
Reproducer

import modin.pandas as pd
import pandas
from timeit import default_timer as timer
import csv
import random
import os
import numpy as np

rows = [10, 1e2, 1e3, 1e4, 1e5, 1e6, 1e7]
value_low = 0
value_high = 10
runs_number = 5
values = 5
csv_name_series = "../local_benches_data/local_bench_series.csv"
sort = True

def drain_call_queue_of_partitions(partitions):
    for i in range(len(partitions)):
        for j in range(len(partitions[i])):
            partitions[i][j].drain_call_queue()

times_modin = {}
times_pandas = {}
times_drain_call_queue = {}
for rows_number in rows:
    try:
        # creation of files with test data
        with open(csv_name_series, "w", newline='') as f1:
            w1=csv.writer(f1, delimiter=',')
            w1.writerow(['data_column1'])
            for i in range(int(rows_number)):
                w1.writerow([round(random.uniform(value_low, value_high), 1)])
    except Exception as exc:
        print(exc)
        os.remove(csv_name_series)

    t_pandas = {}
    t_modin = {}
    t_drain_call_queue = {}
    for run in range(runs_number):
        ser_modin = pd.read_csv(csv_name_series).squeeze().reset_index(drop=True)
        sorter_modin = np.argsort(list(ser_modin))
        ser_pandas = pandas.read_csv(csv_name_series).squeeze().reset_index(drop=True)
        sorter_pandas = np.argsort(list(ser_pandas))
        t0 = timer()
        drain_call_queue_of_partitions(ser_modin._query_compiler._modin_frame._partitions)
        t_drain_call_queue[run] = timer() - t0
        t0 = timer()
        ans = ser_modin.iloc[sorter_modin]
        repr(ans)
        t_modin[run] = timer() - t0
        t0 = timer()
        ans = ser_pandas.iloc[sorter_pandas]
        t_pandas[run] = timer() - t0

    times_modin[rows_number] = min(t_modin.values())
    times_pandas[rows_number] = min(t_pandas.values())
    times_drain_call_queue[rows_number] = min(t_drain_call_queue.values())

Describe the problem

Modin iloc implementation is slower, than Pandas implementation, see graph below for details image

Source code / logs

amyskov avatar Aug 10 '20 10:08 amyskov

@dchigarev, can you revisit this?

YarShev avatar Jan 19 '24 17:01 YarShev

this message was based on incorrect measurements, remeasured here

.iloc on modin is indeed slower than pandas, but as soon as we're speaking about execution times less than 0.1 second, we can do nothing about it (because of ray overheads)

image

dchigarev avatar Jan 22 '24 11:01 dchigarev

.iloc on modin is indeed slower than pandas, but as soon as we're speaking about execution times less than 0.1 second, we can do nothing about it (because of ray overheads)

@dchigarev the times turned out to be very short, iloc works lazy and execute does not trigger lazy calculations?

What is Ray's main overhead? In passing parameters (with serialization) or something else?

anmyachev avatar Jan 22 '24 11:01 anmyachev

@anmyachev thank you for your message! That made me double-check the results and I found an error in how CSVs are being read in this benchmark (a 2-col dataframe was read instead of a series) which made the results incorrect.

Here are the proper measurements:

image

code
import modin.pandas as pd
import pandas
from timeit import default_timer as timer
import numpy as np

rows = [
    10, 100, 1000, 10_000, 100_000, 1_000_000, 10_000_000
]
value_low = 0
value_high = 10
runs_number = 5
values = 5
csv_name_series = "local_bench_series.csv"

from modin.utils import execute

times_modin = {}
times_pandas = {}
times_drain_call_queue = {}
for rows_number in rows:
    pandas.DataFrame({"data_column1": np.round(np.random.uniform(value_low, value_high, rows_number))}).to_csv(csv_name_series)

    t_pandas = {}
    t_modin = {}
    t_drain_call_queue = {}
    for run in range(runs_number):
        ser_modin = pd.read_csv(csv_name_series, index_col=[0]).squeeze().reset_index(drop=True)
        ser_pandas = pandas.read_csv(csv_name_series, index_col=[0]).squeeze().reset_index(drop=True)
        sorter = np.argsort(list(ser_pandas))
        execute(ser_modin)

        t0 = timer()
        ans = ser_modin.iloc[sorter]
        execute(ans)
        t_modin[run] = timer() - t0
        t0 = timer()
        ans = ser_pandas.iloc[sorter]
        t_pandas[run] = timer() - t0

    times_modin[rows_number] = np.median(list(t_modin.values()))
    times_pandas[rows_number] = np.median(list(t_pandas.values()))

import ray
ray.timeline("test.json")

print("modin")
for val in times_modin.values():
    print(val)

print("pandas")
for val in times_pandas.values():
    print(val)

As you can see, the execution time is still less than 0.1 seconds for most of the cases, however for the 10_000_000 rows case it's 0.45 seconds, which makes sense to investigate.

In the benchmark, we perform a worst-case scenario, where the lengths of .iloc's argument is equal to the size of the dataframe. The indexer implies partition's reshuffling, meaning that .iloc will be performed in two steps:

  1. Mask each partition individually (perform part.mask(slice(None)) in our case)
  2. Combine all partitions and reshuffle (full-column ._reorder_labels)

here's a ray-timeline result for the case with 10_000_000 rows: image

In theory, we could have skipped the first masking stage here, as it's completely unnecessary

dchigarev avatar Jan 22 '24 12:01 dchigarev

Here we are already trying to skip the masking stage: https://github.com/modin-project/modin/blob/097ea527c8e3f099e1f252b067a1d5eb055ad0b5/modin/core/dataframe/pandas/dataframe/dataframe.py#L1181

Apparently some index cache is missing.

anmyachev avatar Jan 22 '24 14:01 anmyachev

After remeasuring this on the current master, it appears that the operation is actually being submitted as a single full-axis kernel (without the masking stage), here's a timing break-down for each stage of the .iloc case that takes 400ms with modin:

modin_df.iloc[...] time: 400ms:

  • ray kernel time: 352ms.:
    • deserialize arguments: 12ms
    • execute: 287ms:
      • concat row partitions: 42ms
      • execute func(): 231ms
      • split the result: 5ms
      • ???: 11ms
    • store outputs: 58ms
  • ???: 48ms

Not sure if we can optimize something here

dchigarev avatar Mar 08 '24 13:03 dchigarev