streamz icon indicating copy to clipboard operation
streamz copied to clipboard

Custom aggregations on GroupBy dataframes

Open kkontoudi opened this issue 4 years ago • 3 comments

Hello,

I just started using your package and I find it really great!

However I cannot find a direct way of computing a custom aggregation on a GroupBy dataframe, something like:

sdf.groupby('name').aggregate(Mean())

in the spirit of https://streamz.readthedocs.io/en/latest/dataframes.html#dataframe-aggregations Am I missing something?

Of course I can do the groupby operation inside the aggregation and keep all the keys inside the state but it would be very convenient if the framework could handle this for me

Thanks

kkontoudi avatar Nov 16 '20 00:11 kkontoudi

If you look at GroupbyMean, you will see that the aggregation instance really does keep all the known keys in its internal state, but this state is small. It is accessed via GroupBy.mean (i.e., using GroupBy._accumulate). You could follow the same pattern for whatever aggregation you wished to do.

martindurant avatar Nov 16 '20 16:11 martindurant

Thanks a lot for the quick reply!

Could you give me some direction on how to implement groupby followed by a window for dataframes as well? Something like:

sdf.groupby('name').window(n=2).aggregate(Mean())

(I don't know if I should open a different issue though)

kkontoudi avatar Nov 16 '20 22:11 kkontoudi

I have rarely seen windowing happening immediately after a groupby in streaming where state is being maintained. That's why I guess streaming systems like Spark Streaming and Flink (and streamz ofc) don't support windowing-after-groupby.

We use pipelines like the one below in production and they've worked reliably so far. I would also recommend having a map function do some groupby before passing dataframes into/creating an SDF, I've seen that help a lot of use cases as well, as it ensures that the SDF (and the state maintained) is light-weight, and only relevant columns are stored in the SDF.

def func(window_df):
    aggs = window_df.groupby(["Key"]).agg({"Col1":"min", "Col2": "mean"})
    aggs = aggs.reset_index()
    return aggs

downstream = sdf.window("8h").apply(func).stream

downstream.map(some_func).sink()

Hope this helps!

chinmaychandak avatar Nov 18 '20 18:11 chinmaychandak