clumper
clumper copied to clipboard
Join(s) performance enhancement
Is your feature request related to a problem? Please describe.
As mentioned in the codebase itself, inner_join
and left_join
methods implementation is "naive" and speedup is possible.
I figured it out while working with clumpers with 10k+ dicts.
Describe the solution you'd like Here a possible speedup which avoided the inner for-loop, with few performance comparisons aswell:
from clumper import Clumper
def join(self, other, mapping, how="inner", lsuffix="", rsuffix="_joined"):
"""Possible new join implementation, remark that I am adding the `how` keyword argument"""
result = []
self_keys, other_keys = mapping.keys(), mapping.values()
if how == "inner":
# If it's an inner join, it's sufficient to keep only the dicts that have all the matching keys
_self = self.keep(lambda d: all((k in d.keys() for k in self_keys)))
elif how == "left":
_self = self
else:
raise NotImplementedError()
other_filtered = other.keep(lambda d: all((k in d.keys() for k in other_keys)))
for d_i in _self:
# as already implemented, extract values to join on
values_i = [d_i.get(k) for k in self_keys]
# expoit keep method to find all the dicts in other clumper that match
matched = other_filtered.keep(lambda d: all(d[k]==v for k, v in zip(other_keys, values_i)))
if len(matched):
for d_j in matched:
result.append(Clumper._merge_dicts(d_i, d_j, mapping, lsuffix, rsuffix))
else:
# for left join, we want to keep d_i in any case
if how == "left":
result.append(Clumper._merge_dicts(d_i, {}, mapping, lsuffix, rsuffix))
return self._create_new(result)
Now let's define helper functionalities for benchmarking
from functools import wraps
import numpy as np
import pandas as pd
from time import process_time
from memo import memlist, grid, Runner
def generate_random_clumper(size, keys=list("abc")):
"""
Creates a Clumper with random integers of shape=(size, len(keys))
starting from a pandas DataFrame
"""
df = pd.DataFrame(
data=np.random.randint(0, 100, (size, len(keys))),
columns=keys
)
clump = Clumper(df.to_dict("records"))
return clump
def drop_random_keys(clump, frac = 0.1, keys = list("ab")):
"""
Randomly drops frac percentage of keys not in the provided keys
""""
c1 = clump.sample_frac(frac, replace=False).select(*keys)
c2 = clump.sample_frac(1-frac, replace=False)
return c1.concat(c2)
def timer(func):
"""timer decorator"""
@wraps(func)
def wrapper(*args, **kwargs):
tic = process_time()
res = func(*args, **kwargs)
toc = process_time()
time_elapsed = toc-tic
return res, time_elapsed
return wrapper
Time for testing
results = []
@memlist(data=results)
def join_experiment(left_size, right_size, left_drop, right_drop):
c1 = generate_random_clumper(left_size).pipe(drop_random_keys, left_drop)
c2 = generate_random_clumper(right_size).pipe(drop_random_keys, right_drop)
inner_old, time_inner_old = timer(c1.inner_join)(c2, mapping={"b": "b", "c": "c"})
left_old, time_left_old = timer(c1.left_join)(c2, mapping={"b": "b", "c": "c"})
inner_new, time_inner_new = timer(join)(c1, c2, mapping={"b": "b", "c": "c"}, how="inner")
left_new, time_left_new = timer(join)(c1, c2, mapping={"b": "b", "c": "c"}, how="left")
res = {
"equals_inner": inner_old.equals(inner_new),
"equals_left": left_old.equals(left_new),
"time_inner_old":time_inner_old,
"time_left_old": time_left_old,
"time_inner_new": time_inner_new,
"time_left_new": time_left_new,
"best_inner": "new" if time_inner_new < time_inner_old else "old",
"best_left": "new" if time_left_new < time_left_old else "old"
}
return res
sizes = [100, 1_000, 10_000]
drop_rates = [0.01, 0.1, 0.25, 0.5, 0.9]
settings = grid(left_size=sizes, right_size=sizes, left_drop=drop_rates, right_drop=drop_rates)
runner = Runner(backend="threading", n_jobs=8)
runner.run(func=join_experiment, settings=settings, progbar=True)
df_res = (pd.DataFrame(results)
.assign(
delta_inner = lambda t: t["time_inner_old"]/t["time_inner_new"],
delta_left = lambda t: t["time_left_old"]/t["time_left_new"]
)
)
# As a first sanity check make sure every join is as expected
df_res["equals_inner"].all(), df_res["equals_left"].all()
# (True, True)
# Then let's see
df_res[["delta_inner", "delta_left"]].describe(percentiles=[.01, .05, .25, .5, .75, .9, .99]).T
count | mean | std | min | 1% | 5% | 25% | 50% | 75% | 90% | 99% | max | |
---|---|---|---|---|---|---|---|---|---|---|---|---|
delta_inner | 144 | 8.7035 | 14.6614 | 0.649615 | 0.925807 | 1.03278 | 1.75209 | 3.09647 | 10.467 | 15.9198 | 75.1523 | 109.684 |
delta_left | 144 | 2.81045 | 2.81197 | 0.686376 | 0.743097 | 0.864589 | 1.04525 | 1.46856 | 2.77124 | 7.99939 | 9.13121 | 14.003 |
- Inner join(s) improved in 95% of the tests
- Left join(s) improved in slightly more than 75% of them - in particular when all (actually, 99%) of the dicts in the right clumper have all the keys (i.e. in the test when
right_drop = 0.01
Additional context
- I can imagine that further improvements can be done
- If we want to keep
inner_join
andleft_join
methods standalone, we can write thejoin
as semiprivate and call it in the methods.