Tumbling window 'expires' parameter less than 'size' doesn't work
Hello and thanks for project maintenance!
Steps to reproduce
I'm trying to aggregate some data in the tumbling window then apply the process function to the data in the window. I'm using the expires argument to handle late events (let's imagine we can get an event that belongs to the n minute during the first 10 seconds of the n+1 minute).
def parse_millis(ms):
return datetime.fromtimestamp(int(ms) / 1000)
def process_window_function(window_info, values: list):
logger.info(f"Processing window with "
f"start = {datetime.fromtimestamp(window_info[1][0])}, "
f"end = {datetime.fromtimestamp(window_info[1][1])}")
logger.info(values)
class InputClass(faust.Record, coerce=True):
id: str
timestamp: datetime = DatetimeField(date_parser=parse_millis)
value: int
tumbling_window_table = (
app.Table(
'tumbling_window_table',
default=list,
on_window_close=process_window_function,
)
.tumbling(size=60, expires=timedelta(seconds=10))
.relative_to_field(InputClass.timestamp)
)
input_topic = app.topic("input.topic", value_type=InputClass)
@app.agent(input_topic)
async def process_input(stream):
event: InputClass
async for event in stream:
logger.info(f"Event with timestamp {event.timestamp} is stored in window state")
list_of_values = tumbling_window_table[event.id].value()
list_of_values.append(event.value)
tumbling_window_table[event.id] = list_of_values
Expected behavior
I expect process_window_function call for n window only when 10 seconds of the n+1 window will be passed to handle late events
Actual behavior
The process_window_function for the window n will be called immediately after the first event from window n+1 if the expires argument of the Table is less than the size argument. It looks like Faust just ignores expires. With such behavior late events which can arrive a bit later will be skipped.
Late events will be handled properly if the expires argument is equal or greater than the size but I don't want to have a delay of more than 10 seconds.
Kafka input
{"id":"sensor-1","timestamp":1614808641000,"value":1}
{"id":"sensor-1","timestamp":1614808677000,"value":2}
{"id":"sensor-1","timestamp":1614808681000,"value":3}
Logs
[2021-03-03 21:58:07,510] [1] [INFO] [^Worker]: Ready
[2021-03-03 21:58:41,955] [1] [INFO] Event with timestamp 2021-03-03 21:57:21 is stored in window state
[2021-03-03 21:59:00,574] [1] [INFO] Event with timestamp 2021-03-03 21:57:57 is stored in window state
[2021-03-03 21:59:16,963] [1] [INFO] Event with timestamp 2021-03-03 21:58:01 is stored in window state
[2021-03-03 21:59:16,987] [1] [INFO] Processing window with start = 2021-03-03 21:57:00, end = 2021-03-03 21:57:59.900000
[2021-03-03 21:59:16,988] [1] [INFO] [1, 2]
Versions
- Python version
3.7.9 - Faust version
faust-streaming==0.6.1 - RocksDB version
python-rocksdb
I had a possibility to implement such behavior in Flink but faced this issue in Faust.
Can you try to change your table code with:
tumbling_window_table = (
app.Table(
'tumbling_window_table',
default=list,
on_window_close=process_window_function,
)
.tumbling(size=timedelta(seconds=60), expires=timedelta(seconds=60+10))
.relative_to_field(InputClass.timestamp)
)
It means that expires is size+10. As per the doc expires represents the duration for which we want to store the data (key-value pairs) allocated to each window. I guess it means that it needs to be greater than size.