vaex
vaex copied to clipboard
groupby-apply with arbitrary functions
I would like to perform a groupby on a DataFrame and then apply an arbitrary function f (more sophisticated than computing basic statistics, may involve multiple rows and columns) to each of the groups.
This is achieved in Pandas with df.groupby(...).apply(func=f)
, where f
takes in a DataFrame. From the Vaex documentation/API, it seems like the analogous thing would be df.groupby(...).agg(actions)
, but only several built-in aggregator functions are supported. I also see a DataFrame.apply(f)
method to "Apply a function on a per row basis", but not one that allows for applying arbitrary functions to DataFrames in a groupby.
Note: This may be related to #745, which requests functionality to iterate through groups in a groupby, although as @maartenbreddels suggests there, a built-in method would likely be more efficient (not to mention cleaner).
Hi,
I would like to avoid that pattern as much as possible because it is not that efficient by design. It's either going to lead to random access to the dataframe or a lot of memory copying. Could you give me a more concrete example that you think cannot be done right now (real code that I can run), so we can collect use cases and see how we attack the problem more generally?
Regards,
Maarten
Hi Maarten,
Thanks for the quick response.
Here are some examples (assuming df
is a Pandas DataFrame, except for the scaling example, which assumes it is a Vaex DataFrame):
- Scaling using the vaex.ml scalers:
scale = lambda df: vaex.ml.StandardScaler(features=["x", "y"]).fit_transform(df)
df.groupby("category").apply(scale)
- Normalizing (here using the L1-norm):
normalize = lambda df: pd.concat([df["time"], df[["x", "y"]] / df[["x", "y"]].sum()], axis=1)
df.groupby("category").apply(normalize)
- Finding the max absolute difference in values between consecutive datapoints in chronological order:
max_diff = lambda df: df.sort_values("time")[["x", "y"]].diff().iloc[1:].abs().max()
df.groupby("category").apply(max_diff)
- Finding the max Euclidean distance between consecutive datapoints in chronological order:
distance = lambda diffs: np.sqrt(diffs.x**2 + diffs.y**2)
max_dist = lambda df: df.sort_values("time")[["x", "y"]].diff().iloc[1:].pipe(distance).max()
df.groupby("category").apply(max_dist)
Here's the code I used to generate a sample dataset for testing (except for the scaling example --- I haven't yet gotten vaex-ml working on my machine):
import numpy as np
import pandas as pd
df = pd.DataFrame(np.random.random_sample(size=(6, 3)),
index=pd.Index(["a"]*3 + ["b"]*3, name="category"),
columns=["time", "x", "y"])
It seems like all of the above lambda functions are composed of operations that can be performed efficiently and/or parallelized, so it should be possible to support them in Vaex (and most of them are already supported for operating on a DataFrame not in a groupby). Actually, wouldn't it make sense to allow any arbitrary function that Vaex can apply to a DataFrame to be applied to DataFrames in a groupby as well?
Best, Brian
The df.groupby(...).apply(func=f)
functionality (which is also implemented in Dask) is much needed.
If not, we have to switch back to pandas to apply a custom aggregation function on the groups.
It might be the case that Vaex has another approach for such functionalities which I might not be aware of. If that is the case, I could not track it down in the documentation.
groupby
and apply
is in an urgent need. Any progress?
It would help us a lot if also you could specify the specific use case. We have some ideas on how to implement it efficiently, but the more examples/use cases the better.
such as every group's head
n rows, or every group size
/nlargest
...,
all this can get by df.groupby('_id').apply(lambd x:x.head(N))
or df.groupby('_id').apply(lambd x:x.shape[0])
This is probably (certainly) a very bad idea.. but for those with great need of using (very) custom aggregators, you can try this hack.
The memory / speed will depend on the custom function, and how you use vaex within that. In this case, I am doing an in-memory operation, using a scikit-learn estimator. But the example data is small so I can afford it.
import vaex
import vaex.ml
from sklearn.linear_model import LinearRegression
# Example data
df = vaex.example()
# Custom aggregation function to operate on df
def my_func(df):
X = df[['x', 'y', 'z']].values
y = df['FeH'].values
lr = LinearRegression().fit(X, y)
return lr.coef_, lr.intercept_
# Place to store the results
res = {'coef': [], 'intercept': [], 'id': []}
groups = df.groupby('id')
# Iterate over each group, use the function, and store the result
for _, g in groups:
coef, intercept = my_func(g)
res['coef'].append(coef)
res['intercept'].append(intercept)
res['id'].append(g.id[:1].values[0])
# Convert back to a dataframe
df_group = vaex.from_dict(res)
print(df_group)
Again, bad idea, not recommended, and there should be a better/faster way to do this. But until that moment comes, this can at least serve as inspiration.
Better ideas / PRs etc.. are welcome!
Another use case is collect
/gather
all values of a given column (of every group) into a list. In pandas I'd do
collect = lambda x: list(x)
df.groupby('col1').agg({'col2' : collect})
And thank you for the great library! 🙏🏼
collect to list is all I really need. But for truly custom aggregation function you could do an accumulator pattern similar to spark.
Any updates on this ?
I too am interested in this functionality for vaex to truly replace pandas in our workflow
This is related to #376
Another use case is
collect
/gather
all values of a given column (of every group) into a list. In pandas I'd docollect = lambda x: list(x) df.groupby('col1').agg({'col2' : collect})
And thank you for the great library! 🙏🏼
How to turn this Pandas code skill_df = df[['skill_id', 'problem_id']].groupby(['skill_id']).apply(lambda r: np.array(list(set(r['problem_id'].values))))
into the code of VAEX.
I have been following this thread for over a year now. It's incredible what vaex has done so far. I think the biggest value vaex can add right now is this specific use case. Groupby-apply allows so much flexiblity, and is so commonly used in the DS industry. If vaex can implement this then we'll have more opportunities to explore big data. I haven't come across many libraries that can do this well. If vaex can, that would really separate them from the rest of the pack. Consider even the following simple examples:
df.groupby('col1')['col2'].transform(lambda x: x.shift())
df.groupby(['col1', 'col2']).apply(lambda x: my_custom_function(x, **kwargs))
I have been following this thread for over a year now. It's incredible what vaex has done so far. I think the biggest value vaex can add right now is this specific use case. Groupby-apply allows so much flexiblity, and is so commonly used in the DS industry. If vaex can implement this then we'll have more opportunities to explore big data. I haven't come across many libraries that can do this well. If vaex can, that would really separate them from the rest of the pack. Consider even the following simple examples:
df.groupby('col1')['col2'].transform(lambda x: x.shift()) df.groupby(['col1', 'col2']).apply(lambda x: my_custom_function(x, **kwargs))
Because of this, pandas can't be replaced yet, at least in my case
Hasn't been implemented yet? 😢
I don't think if we implement this in vaex that it can be done in a performant way. Therefore, I think this can just as well be done with Pandas itself. Unless we find a way to do it faster, but I haven't come up with a good solution yet.
The problem is a huge Vaex dataframe cannot be converted to Pandas dataframe in order to use the apply() method.