modin
modin copied to clipboard
PERF: unnecessary (expensive) concat
I have ray worker that is calling PandasDataframeAxisPartition.deploy_axis_func and in that doing pandas.concat on 16 DataFrames with MultiIndex indexes, an expensive concat.
AFAIK there isn't a nice way to see what called deploy_axis_func, so this is a bit speculative.
I think the partitions being collected are exactly the partitions of an existing DataFrame, which I think means that frame's index is already materialized somewhere, so reconstructing it inside concat is unnecessary. i.e. in deploy_axis_func we could do something like
+orig_indexes = [x.index for x in partitions]
+N = 0
+for obj in partitions:
+ obj.index = range(N, N+len(obj))
+ N += len(obj)
dataframe = pandas.concat(list(partitions), axis=axis, copy=False)
+dataframe.index = thing_we_already_know_so_dont_need_to_recompute
+
+for index, obj in zip(orig_indexes, partitions):
+ obj.index = index
If I'm right here, we could see significant savings. e.g. in the script im profiling, ATM 5% is spent in _get_concat_axis, and I think a lot more indirectly inside worker processes.
Moreover, if this works, we could do the pinning/unpinning before pickling/unpickling and save on pickle costs.
Could you share the script you are using to profile? @jbrockmendel
Could you share the script you are using to profile?
I'm not sure that's allowed. If it helps, @yarshev and @anmyachev are looking at the same script.
@jbrockmendel Ah ok. Is it possible to at least get a minimum reproducible example?
Is it possible to at least get a minimum reproducible example?
Well, no. I know concat is getting called bc I put a breakpoint() inside MultiIndex.append. I use ray debug to step through the stack that gets to that call and the last thing I see before ray's main_loop is
> /Users/brock/Desktop/modin/modin/modin/core/dataframe/pandas/partitioning/axis_partition.py(159)deploy_axis_func()
-> dataframe = pandas.concat(list(partitions), axis=axis, copy=False)
AFAICT the tooling doesn't provide a nice way to find out what call got me here.
@jbrockmendel I think deploy_ray_func is remotely calling deploy_axis_func because of a line in PandasOnRayDataframeVirtualPartition.deploy_axis_func, which as far as I know only gets called in PandasDataframeAxisPartition .apply.
In case you don't know, PandasDataframeAxisPartition is a partition containing a sequence of one or more physical pieces of data (its list_of_blocks) along an axis. When you apply a function to it, a remote task will concatenate the blocks and then apply the function to the result.
I didn't know that concatenating the indices would itself be so expensive. In that case, maybe what you suggested in your first post would help. The full index may live in the PandasDataframe (though after #4726, not necessarily). We could plumb this index down to the remote task. But we'd have to pay the cost of putting the index in the object store and then getting it out.
@jbrockmendel I tried this approach, it became slower to work on our script (because of Ray's work with object store as @mvashishtha mentioned). In addition, for some operations, for example groupby.agg we do not have a pre-computed index that we can pass.
Is there the easy way to speed up concatenating the MultiIndex itself on pandas side?
Is there the easy way to speed up concatenating the MultiIndex itself on pandas side?
There's a patch that speeds up this particular case, but may slow down other cases (so i haven't decided yet whether to upstream it to pandas):
orig_mi_append = pandas.MultiIndex.append
def new_append(self, other):
if not isinstance(other, list):
other = [other]
if all(isinstance(obj, pandas.MultiIndex) for obj in other):
if all(obj.nlevels == self.nlevels for obj in other):
if all(all(pandas.core.dtypes.missing.array_equivalent(slev, olev) for slev, olev in zip(self._levels, obj._levels)) for obj in other):
objs = [self] + other
new_codes = []
for i in range(self.nlevels):
lev_codes = np.concatenate([obj.codes[i] for obj in objs])
new_codes.append(lev_codes)
mi = pandas.MultiIndex(codes=new_codes, levels=self.levels, names=self.names)
return mi
return orig_mi_append(self, other)
pandas.MultiIndex.append = new_append
This should help us for to_pandas function because we most likely already have index in the main process.