faust icon indicating copy to clipboard operation
faust copied to clipboard

Tumbling window 'expires' parameter less than 'size' doesn't work

Open berimbolo13 opened this issue 4 years ago • 1 comments

Hello and thanks for project maintenance!

Steps to reproduce

I'm trying to aggregate some data in the tumbling window then apply the process function to the data in the window. I'm using the expires argument to handle late events (let's imagine we can get an event that belongs to the n minute during the first 10 seconds of the n+1 minute).

def parse_millis(ms):
    return datetime.fromtimestamp(int(ms) / 1000)


def process_window_function(window_info, values: list):
    logger.info(f"Processing window with "
                f"start = {datetime.fromtimestamp(window_info[1][0])}, "
                f"end = {datetime.fromtimestamp(window_info[1][1])}")
    logger.info(values)


class InputClass(faust.Record, coerce=True):
    id: str
    timestamp: datetime = DatetimeField(date_parser=parse_millis)
    value: int


tumbling_window_table = (
    app.Table(
        'tumbling_window_table',
        default=list,
        on_window_close=process_window_function,
    )
        .tumbling(size=60, expires=timedelta(seconds=10))
        .relative_to_field(InputClass.timestamp)
)

input_topic = app.topic("input.topic", value_type=InputClass)


@app.agent(input_topic)
async def process_input(stream):
    event: InputClass
    async for event in stream:
        logger.info(f"Event with timestamp {event.timestamp} is stored in window state")
        list_of_values = tumbling_window_table[event.id].value()
        list_of_values.append(event.value)
        tumbling_window_table[event.id] = list_of_values

Expected behavior

I expect process_window_function call for n window only when 10 seconds of the n+1 window will be passed to handle late events

Actual behavior

The process_window_function for the window n will be called immediately after the first event from window n+1 if the expires argument of the Table is less than the size argument. It looks like Faust just ignores expires. With such behavior late events which can arrive a bit later will be skipped.

Late events will be handled properly if the expires argument is equal or greater than the size but I don't want to have a delay of more than 10 seconds.

Kafka input

{"id":"sensor-1","timestamp":1614808641000,"value":1}
{"id":"sensor-1","timestamp":1614808677000,"value":2}
{"id":"sensor-1","timestamp":1614808681000,"value":3}

Logs

[2021-03-03 21:58:07,510] [1] [INFO] [^Worker]: Ready 
[2021-03-03 21:58:41,955] [1] [INFO] Event with timestamp 2021-03-03 21:57:21 is stored in window state 
[2021-03-03 21:59:00,574] [1] [INFO] Event with timestamp 2021-03-03 21:57:57 is stored in window state 
[2021-03-03 21:59:16,963] [1] [INFO] Event with timestamp 2021-03-03 21:58:01 is stored in window state 
[2021-03-03 21:59:16,987] [1] [INFO] Processing window with start = 2021-03-03 21:57:00, end = 2021-03-03 21:57:59.900000 
[2021-03-03 21:59:16,988] [1] [INFO] [1, 2] 

Versions

  • Python version 3.7.9
  • Faust version faust-streaming==0.6.1
  • RocksDB version python-rocksdb

I had a possibility to implement such behavior in Flink but faced this issue in Faust.

berimbolo13 avatar Mar 03 '21 22:03 berimbolo13

Can you try to change your table code with:

tumbling_window_table = (
    app.Table(
        'tumbling_window_table',
        default=list,
        on_window_close=process_window_function,
    )
        .tumbling(size=timedelta(seconds=60), expires=timedelta(seconds=60+10))
        .relative_to_field(InputClass.timestamp)
)

It means that expires is size+10. As per the doc expires represents the duration for which we want to store the data (key-value pairs) allocated to each window. I guess it means that it needs to be greater than size.

thomas-chauvet avatar Oct 10 '22 07:10 thomas-chauvet