modin
modin copied to clipboard
Modin iloc performance is not good enough
System information
- OS Platform and Distribution (e.g., Linux Ubuntu 16.04): Linux Ubuntu 19.04
-
Modin version (
modin.__version__
): 0.7.3+290.gaa78a18 - Python version: 3.8.3
- Code we can use to reproduce:
Reproducer
import modin.pandas as pd
import pandas
from timeit import default_timer as timer
import csv
import random
import os
import numpy as np
rows = [10, 1e2, 1e3, 1e4, 1e5, 1e6, 1e7]
value_low = 0
value_high = 10
runs_number = 5
values = 5
csv_name_series = "../local_benches_data/local_bench_series.csv"
sort = True
def drain_call_queue_of_partitions(partitions):
for i in range(len(partitions)):
for j in range(len(partitions[i])):
partitions[i][j].drain_call_queue()
times_modin = {}
times_pandas = {}
times_drain_call_queue = {}
for rows_number in rows:
try:
# creation of files with test data
with open(csv_name_series, "w", newline='') as f1:
w1=csv.writer(f1, delimiter=',')
w1.writerow(['data_column1'])
for i in range(int(rows_number)):
w1.writerow([round(random.uniform(value_low, value_high), 1)])
except Exception as exc:
print(exc)
os.remove(csv_name_series)
t_pandas = {}
t_modin = {}
t_drain_call_queue = {}
for run in range(runs_number):
ser_modin = pd.read_csv(csv_name_series).squeeze().reset_index(drop=True)
sorter_modin = np.argsort(list(ser_modin))
ser_pandas = pandas.read_csv(csv_name_series).squeeze().reset_index(drop=True)
sorter_pandas = np.argsort(list(ser_pandas))
t0 = timer()
drain_call_queue_of_partitions(ser_modin._query_compiler._modin_frame._partitions)
t_drain_call_queue[run] = timer() - t0
t0 = timer()
ans = ser_modin.iloc[sorter_modin]
repr(ans)
t_modin[run] = timer() - t0
t0 = timer()
ans = ser_pandas.iloc[sorter_pandas]
t_pandas[run] = timer() - t0
times_modin[rows_number] = min(t_modin.values())
times_pandas[rows_number] = min(t_pandas.values())
times_drain_call_queue[rows_number] = min(t_drain_call_queue.values())
Describe the problem
Modin iloc
implementation is slower, than Pandas implementation, see graph below for details
Source code / logs
@dchigarev, can you revisit this?
this message was based on incorrect measurements, remeasured here
.iloc
on modin is indeed slower than pandas, but as soon as we're speaking about execution times less than 0.1 second, we can do nothing about it (because of ray overheads)
.iloc on modin is indeed slower than pandas, but as soon as we're speaking about execution times less than 0.1 second, we can do nothing about it (because of ray overheads)
@dchigarev the times turned out to be very short, iloc
works lazy and execute
does not trigger lazy calculations?
What is Ray's main overhead? In passing parameters (with serialization) or something else?
@anmyachev thank you for your message! That made me double-check the results and I found an error in how CSVs are being read in this benchmark (a 2-col dataframe was read instead of a series) which made the results incorrect.
Here are the proper measurements:
code
import modin.pandas as pd
import pandas
from timeit import default_timer as timer
import numpy as np
rows = [
10, 100, 1000, 10_000, 100_000, 1_000_000, 10_000_000
]
value_low = 0
value_high = 10
runs_number = 5
values = 5
csv_name_series = "local_bench_series.csv"
from modin.utils import execute
times_modin = {}
times_pandas = {}
times_drain_call_queue = {}
for rows_number in rows:
pandas.DataFrame({"data_column1": np.round(np.random.uniform(value_low, value_high, rows_number))}).to_csv(csv_name_series)
t_pandas = {}
t_modin = {}
t_drain_call_queue = {}
for run in range(runs_number):
ser_modin = pd.read_csv(csv_name_series, index_col=[0]).squeeze().reset_index(drop=True)
ser_pandas = pandas.read_csv(csv_name_series, index_col=[0]).squeeze().reset_index(drop=True)
sorter = np.argsort(list(ser_pandas))
execute(ser_modin)
t0 = timer()
ans = ser_modin.iloc[sorter]
execute(ans)
t_modin[run] = timer() - t0
t0 = timer()
ans = ser_pandas.iloc[sorter]
t_pandas[run] = timer() - t0
times_modin[rows_number] = np.median(list(t_modin.values()))
times_pandas[rows_number] = np.median(list(t_pandas.values()))
import ray
ray.timeline("test.json")
print("modin")
for val in times_modin.values():
print(val)
print("pandas")
for val in times_pandas.values():
print(val)
As you can see, the execution time is still less than 0.1 seconds for most of the cases, however for the 10_000_000 rows case it's 0.45 seconds, which makes sense to investigate.
In the benchmark, we perform a worst-case scenario, where the lengths of .iloc
's argument is equal to the size of the dataframe. The indexer implies partition's reshuffling, meaning that .iloc
will be performed in two steps:
- Mask each partition individually (perform part.mask(
slice(None)
) in our case) - Combine all partitions and reshuffle (full-column
._reorder_labels
)
here's a ray-timeline result for the case with 10_000_000 rows:
In theory, we could have skipped the first masking stage here, as it's completely unnecessary
Here we are already trying to skip the masking stage: https://github.com/modin-project/modin/blob/097ea527c8e3f099e1f252b067a1d5eb055ad0b5/modin/core/dataframe/pandas/dataframe/dataframe.py#L1181
Apparently some index cache is missing.
After remeasuring this on the current master, it appears that the operation is actually being submitted as a single full-axis kernel (without the masking stage), here's a timing break-down for each stage of the .iloc
case that takes 400ms with modin:
modin_df.iloc[...]
time: 400ms:
- ray kernel time: 352ms.:
- deserialize arguments: 12ms
- execute: 287ms:
- concat row partitions: 42ms
- execute func(): 231ms
- split the result: 5ms
- ???: 11ms
- store outputs: 58ms
- ???: 48ms
Not sure if we can optimize something here