QUESTION: Any possibility of getting a sliding window example
I note in the docs that it states you support hoping, tumbling, and sliding windows. There's even a faust.SlidingWindow class, but I can't seem to figure how to implement it.
The requirement is to have a live count per event for the preceding 10, 30, 60, and 300s every second. In the future, we really also want to include a count and some time based calculations for the last 24 hours, 1 week, 1 month, and 3 months too.
I have a workaround at the moment, which is to use a 1s tumbling window with an expiration time of 300s, then I sum all the results up from 300 to now using the delta method on the windowed table and writing that to a topic. That's ok for now, with a small number of messages from a small number of sources, but production is possibly going to be thousands of messages per second with tens of thousands of sources, so I'm not sure how salable my solution is.
class AlarmCount(faust.Record, serializer='json'):
event_id: int
source_id: int
counts_10: int
counts_30: int
counts_60: int
counts_300: int
@app.agent(events_topic)
async def new_event(stream):
async for value in stream:
# calculate the count statistics
counts_10=0
counts_30=0
counts_60=0
counts_300=0
event_counts_table[value.global_id] += 1
for i in range(300):
if(i<=10):
counts_10+=event_counts_table[value.source_id].delta(i)
if(i<=30):
counts_30+=event_counts_table[value.source_id].delta(i)
if(i<=60):
counts_60+=event_counts_table[value.source_id].delta(i)
if(i<=300):
counts_300+=event_counts_table[value.source_id].delta(i)
await event_counts_topic.send(
value=EventCount(
event_id=value.event_id,
source_id=value.source_id,
counts_10=counts_10,
counts_30=counts_30,
counts_60=counts_60,
counts_300=counts_300
)
)
I also had a method that was faster for large windows where rather than a windowed table, I had a standard table where the value was a list of times. Each time something new comes in it performs a list comprehension to remove those past the expiry time, append the new value, then call len(<list>) on the result. This is much quicker, but it still results in issues with delays - the consumer is about 2000 records behind the producer after 50k records and a lot of Timer Monitor.sampler woke up too late, with a drift of issues.
Other than using kafka directly (meaning Java... which I'd dearly love to avoid), there has to be a better way.
I also found this class that seems to shwo a sliding window, and this test example from the old robinhood version, but I have no idea how to implement it with respect to a topic stream or table. Assistance or guidance greatly appreciated.
One possible solution that should be fast to aggregate across a large window would be to always increment the value in the current window, but if the current window is 0 then grab the previous window's value and then add one to that. Then the calculation of how many in the last n seconds is simply the difference between the window.delta(n) and now.
But one issue is what if the previous window is also 0, and a few hundred before that? What if there is no non-zero value in the history which might be tens of thousands of periods old.
Still just looking for a good solution here, as all other methods I've tried up to now have a limit where you start to get a lot of Timer Monitor.sampler woke up too late, with a drift of errors printed.
So in looking through the test for sliding windows I also happened to find the table.py file.
Then by looking at how hopping and tumbling windows were implemented and cross referencing the respective test files, I added the following to the table.py file:
def sliding(
self,
before: Seconds,
after: Seconds,
expires: Seconds = None,
key_index: bool = False,
) -> WindowWrapperT:
"""Wrap table in a hopping window."""
return self.using_window(
windows.SlidingWindow(before, after, expires),
key_index=key_index,
)
But that appears to only ever give me a count of 1 for any window range when performing something like the word count example.
A new solution that works very well, but still needs more smarts to work perfectly:
- Events come into a stream processor (agent) and it increments a counter in the
countertable, and also in thewindowed_countertable - The
windowed_countertable uses anon_window_closemethod to write the values to a new topic calledremove_expired - A stream processor on the
remove_expiredtopic then decrements the value in thecountertable
It's a really nice solution to aggregating very large windows. But the problem is that it doesn't handle a re-balance. If a worker shuts down abruptly, or a change is made that requires a re balance, then any windows that expire during that time are gone and don't appear to send the update to remove them.
Is there a way to ensure that expired windows are always accounted for - perhaps by Kafka its self?
Or is this only an issue because I'm trying everything in memory, not using RocksDB yet?
So in looking through the test for sliding windows I also happened to find the table.py file.
Then by looking at how hopping and tumbling windows were implemented and cross referencing the respective test files, I added the following to the table.py file:
def sliding( self, before: Seconds, after: Seconds, expires: Seconds = None, key_index: bool = False, ) -> WindowWrapperT: """Wrap table in a hopping window.""" return self.using_window( windows.SlidingWindow(before, after, expires), key_index=key_index, )But that appears to only ever give me a count of 1 for any window range when performing something like the word count example.
I am facing the same issue, seem sliding() function doesn't work actually?
I try to make hopping windows works, check out https://github.com/robinhood/faust/issues/514 and my fork https://github.com/doncat99/faust for more detail.
Thanks @doncat99. I did try using the on_window_close from the example , but found that the closed/expired windows were 'lost' during a re-balance or after a crash of a worker never to be recovered. If we're using this to count over a range then we ended up with a count that would never return to zero even when no events occurred for longer than the window range. Again, unless using rocks db fixes this?
Just in addition - @doncat99 correct me if I'm wrong, but using the hopping window won't be serviceable if we're looking to get 3 month aggregates at 1 minute intervals. Firstly, because there will be an enormous amount of duplicate data in different windows, but also, until the data fills up the whole 3 months we can't get the result. And if we reset the system, then we need to wait 3 months to get a single value.
So after a little more looking, I managed to understand what you were asking me to do, @doncat99. I took a look at the base.py file and put a few print statements in there to see what happens when a window closes and how it decides which keys to remove. I found that when a worker restarts the last_closed_window is set to 0 and the timestamps are empty. When a re-partition occurs, then last_closed_window stays as it was earlier, and the timestamps for those the worker was previously responsible for stay in place, but the timestamps for the new partitions are empty indicating this information is lost when transferred. I'm not really competent enough to follow the rabbit hole all the way to the bottom to find the route cause of this issue, so hopefully someone else can help here.