streamz
streamz copied to clipboard
Custom aggregations on GroupBy dataframes
Hello,
I just started using your package and I find it really great!
However I cannot find a direct way of computing a custom aggregation on a GroupBy dataframe, something like:
sdf.groupby('name').aggregate(Mean())
in the spirit of https://streamz.readthedocs.io/en/latest/dataframes.html#dataframe-aggregations Am I missing something?
Of course I can do the groupby operation inside the aggregation and keep all the keys inside the state but it would be very convenient if the framework could handle this for me
Thanks
If you look at GroupbyMean, you will see that the aggregation instance really does keep all the known keys in its internal state, but this state is small. It is accessed via GroupBy.mean (i.e., using GroupBy._accumulate
). You could follow the same pattern for whatever aggregation you wished to do.
Thanks a lot for the quick reply!
Could you give me some direction on how to implement groupby followed by a window for dataframes as well? Something like:
sdf.groupby('name').window(n=2).aggregate(Mean())
(I don't know if I should open a different issue though)
I have rarely seen windowing happening immediately after a groupby in streaming where state is being maintained. That's why I guess streaming systems like Spark Streaming and Flink (and streamz ofc) don't support windowing-after-groupby.
We use pipelines like the one below in production and they've worked reliably so far. I would also recommend having a map
function do some groupby
before passing dataframes into/creating an SDF, I've seen that help a lot of use cases as well, as it ensures that the SDF (and the state maintained) is light-weight, and only relevant columns are stored in the SDF.
def func(window_df):
aggs = window_df.groupby(["Key"]).agg({"Col1":"min", "Col2": "mean"})
aggs = aggs.reset_index()
return aggs
downstream = sdf.window("8h").apply(func).stream
downstream.map(some_func).sink()
Hope this helps!