Unit test `on_window_close` for window aggregation
Checklist
- [ x] I have included information about relevant versions
I am using faust in the latest version 0.8.11.
- [ ] I have verified that the issue persists when using the
masterbranch of Faust.
Steps to reproduce
Based on the windowed aggregation example provided I am trying to unit test the function window_processor.
When I send event based on doc I am not able to see when window_processor is triggered.
Expected behavior
I expect to be able to check that window has been triggered.
Actual behavior
I can see that tumbling_table.table.last_closed_window is 0.0. I guess the window is never close.
Versions
- Python version: 3.9
- Faust version: 0.8.11
- Operating system: MacOS
Update: I've pushed a small repo with an example here.
Thanks for providing an example with a unit test included, this will make it much easier to troubleshoot. Looking through our current unit tests, there don't seem to be any tests that verify cases such as tumbling_table.table.last_closed_window != 0. I'll look into changing that and hopefully providing a fix for you.
I ran the example of windowed_aggregation.py and changed the configuration similar to what you have, except an extra print statement:
@app.agent(source)
async def print_windowed_events(stream):
async for event in stream:
value_list = tumbling_table['events'].value()
value_list.append(event)
tumbling_table['events'] = value_list
print("last closed table:", tumbling_table.table.last_closed_window)
I saw the last_closed_window value update just fine after 5 seconds (as provided in your configuration), so I'm not sure if your test has issues.
Yes, it works well in the "normal" case.
The case where I cannot see the correct last_closed_window and it stays at 0 is when we run in test_context in unit test.
It also blocks the test for function on_window_close.
See https://github.com/faust-streaming/faust/pull/391 for an internal test of last_closed_window. I'll need to think more about what's going on in your repo.