clumper icon indicating copy to clipboard operation
clumper copied to clipboard

Join(s) performance enhancement

Open FBruzzesi opened this issue 2 years ago • 0 comments

Is your feature request related to a problem? Please describe. As mentioned in the codebase itself, inner_join and left_join methods implementation is "naive" and speedup is possible. I figured it out while working with clumpers with 10k+ dicts.

Describe the solution you'd like Here a possible speedup which avoided the inner for-loop, with few performance comparisons aswell:

from clumper import Clumper

def join(self, other, mapping, how="inner", lsuffix="", rsuffix="_joined"):
    """Possible new join implementation, remark that I am adding the `how` keyword argument"""
    
    result = []
    self_keys, other_keys = mapping.keys(), mapping.values()
    
    if how == "inner":
        # If it's an inner join, it's sufficient to keep only the dicts that have all the matching keys
        _self = self.keep(lambda d: all((k in d.keys() for k in self_keys)))
    elif how == "left":
        _self = self
    else:
        raise NotImplementedError()
    
    other_filtered = other.keep(lambda d: all((k in d.keys() for k in other_keys)))
    
    for d_i in _self:
        
        # as already implemented, extract values to join on
        values_i = [d_i.get(k) for k in self_keys]
        
        # expoit keep method to find all the dicts in other clumper that match
        matched = other_filtered.keep(lambda d: all(d[k]==v for k, v in zip(other_keys, values_i)))

        if len(matched):
            for d_j in matched:
                result.append(Clumper._merge_dicts(d_i, d_j, mapping, lsuffix, rsuffix))
        else:
            # for left join, we want to keep d_i in any case
            if how == "left":
                 result.append(Clumper._merge_dicts(d_i, {}, mapping, lsuffix, rsuffix))
            
    return self._create_new(result)

Now let's define helper functionalities for benchmarking


from functools import wraps
import numpy as np
import pandas as pd
from time import process_time
from memo import memlist, grid, Runner

def generate_random_clumper(size, keys=list("abc")):
    """
    Creates a Clumper with random integers of shape=(size, len(keys)) 
    starting from a pandas DataFrame
    """
    
    df = pd.DataFrame(
        data=np.random.randint(0, 100, (size, len(keys))),
        columns=keys
    )
    
    clump = Clumper(df.to_dict("records"))
    return clump

def drop_random_keys(clump, frac = 0.1, keys = list("ab")):
    """
    Randomly drops frac percentage of keys not in the provided keys
    """"
    c1 = clump.sample_frac(frac, replace=False).select(*keys)
    c2 = clump.sample_frac(1-frac, replace=False)
    
    return c1.concat(c2)

def timer(func):
    """timer decorator"""
    @wraps(func)
    def wrapper(*args, **kwargs):

        tic = process_time()
        res = func(*args, **kwargs)
        toc = process_time()

        time_elapsed = toc-tic
        return res, time_elapsed

    return wrapper

Time for testing

results = []

@memlist(data=results)
def join_experiment(left_size, right_size, left_drop, right_drop):
    
    c1 = generate_random_clumper(left_size).pipe(drop_random_keys, left_drop)
    c2 = generate_random_clumper(right_size).pipe(drop_random_keys, right_drop)
    
    inner_old, time_inner_old = timer(c1.inner_join)(c2, mapping={"b": "b", "c": "c"})
    left_old, time_left_old = timer(c1.left_join)(c2, mapping={"b": "b", "c": "c"})
    inner_new, time_inner_new = timer(join)(c1, c2, mapping={"b": "b", "c": "c"}, how="inner")
    left_new, time_left_new = timer(join)(c1, c2, mapping={"b": "b", "c": "c"}, how="left")
    
    res = {
        "equals_inner": inner_old.equals(inner_new),
        "equals_left": left_old.equals(left_new),
        "time_inner_old":time_inner_old,
        "time_left_old": time_left_old,
        "time_inner_new": time_inner_new,
        "time_left_new": time_left_new,
        "best_inner": "new" if time_inner_new < time_inner_old else "old",
        "best_left": "new" if time_left_new < time_left_old else "old"
    }
    return res

sizes = [100, 1_000, 10_000]
drop_rates = [0.01, 0.1, 0.25, 0.5, 0.9]

settings = grid(left_size=sizes, right_size=sizes, left_drop=drop_rates, right_drop=drop_rates)
runner = Runner(backend="threading", n_jobs=8)
runner.run(func=join_experiment, settings=settings, progbar=True)

df_res = (pd.DataFrame(results)
    .assign(
        delta_inner = lambda t: t["time_inner_old"]/t["time_inner_new"],
        delta_left = lambda t: t["time_left_old"]/t["time_left_new"]
    )
)

# As a first sanity check make sure every join is as expected
df_res["equals_inner"].all(), df_res["equals_left"].all()
# (True, True)

# Then let's see 
df_res[["delta_inner", "delta_left"]].describe(percentiles=[.01, .05, .25, .5, .75, .9, .99]).T
count mean std min 1% 5% 25% 50% 75% 90% 99% max
delta_inner 144 8.7035 14.6614 0.649615 0.925807 1.03278 1.75209 3.09647 10.467 15.9198 75.1523 109.684
delta_left 144 2.81045 2.81197 0.686376 0.743097 0.864589 1.04525 1.46856 2.77124 7.99939 9.13121 14.003
  • Inner join(s) improved in 95% of the tests
  • Left join(s) improved in slightly more than 75% of them - in particular when all (actually, 99%) of the dicts in the right clumper have all the keys (i.e. in the test when right_drop = 0.01

Additional context

  • I can imagine that further improvements can be done
  • If we want to keep inner_join and left_join methods standalone, we can write the join as semiprivate and call it in the methods.

FBruzzesi avatar Aug 02 '22 16:08 FBruzzesi