faust icon indicating copy to clipboard operation
faust copied to clipboard

Use Case: Windowing per key/partition

Open nogreintel opened this issue 3 years ago • 1 comments

Question

I have a general question on Faust.

Using windowed aggregation as an example.

I understand that once having defined a table, we can use windows for processing the data.

My question is, that if one wanted to define different keys within a table, ie. table['a'], table['b'], is it possible to have on_window_close() occur on a key-level basis for the table.

The use case is that we want to have a window per partition of data.

Is this possible? Or would there need to be unique tables created for each partition in order to have on_window_close() on those partitions?

Also, does Faust support any sort of data output triggers aside from just windowing such as data size or item count?

These aforementioned features are possible in other data stream processors like Apache Flink and Beam.

nogreintel avatar Feb 24 '22 19:02 nogreintel

Late answer here.

My question is, that if one wanted to define different keys within a table, ie. table['a'], table['b'], is it possible to have on_window_close() occur on a key-level basis for the table.

The answer is yes. Once you've defined your table, in the agent you can have table by key. For instance:

@app.agent(source_topic)
async def window_agg(stream):
    async for measurement in stream:
        # get current value for the measurement ID 
        measurements = table.get(measurement.id).value()
        # append new value to the list
        measurements.append(measurement)
        # write it back to the table (also updating changelog):
        table[measurement.id] = measurements

This will give you a list of measurement per window and measurement ID.

I recommend to use measurement.id also as a key when you send message to your topic. This each measurement associated to an id will be in the same partition.

Important note:

I am still not sure if you need to group_by the stream as mentioned in the doc [here]. It seems to be the case if you have multiple worker but I am really not sure. Any help welcome to clarify this point.

thomas-chauvet avatar Nov 18 '22 16:11 thomas-chauvet