faust
faust copied to clipboard
Windows lost during/after a worker dies or is re-balanced
Steps to reproduce
Run the windowed_aggreagation example on 2 workers. Kill one worker and observe the output.
Expected behavior
After a re balance (or similar disruption) all old windows with an on_window_close should still be available for processing via the supplied on_window_close function until they are acknowledged as 'seen' by the function.
Actual behavior
It appears that either windows before the re-balance or those that expire during the re-balance are lost forever with no means to track them with the on_window_close. Once the window has expired, then the window data is lost forever and can't be recovered. This results in aggregates that never reach zero counts again when the supply topic stops producing new messages to aggregate over.
Needs
I'm trying to use this to perform long-term windowed aggregations as the sliding_window option doesn't appear to be functioning. It would be good if someone can explain how window are lost when a worker dies or a re-balance occurs. If this is not a good method and it simply won't work, that's OK - I just need to know so I can let this go and focus on an alternative method.
Are you use memory
or rocksdb
storage backend?
Thanks for the response @Roman1us. I'm only using memory until I can figure out how to install RocksDB - I'm very surprised just how painful a task that is an how poor the instructions are. I suppose that will solve almost all of my problems as it's probably lost due to the data held in memory dying with the worker? Have I naively assumed that while it's no permanent store, that Kafka/zookeeper would keep things going until they were stopped, and then the memory/rockscb issue would become a problem?
RocksDB used only for fast recover from changelog topic. We are use memory storage too, but we store in changelog topic about 30k records and it recovers in seconds. Please try my pull #199 maybe it fix your problem
@Roman1us, I applied your pull but it didn't seem to solve the issue. It also managed to get rocksdb installed and used that instead and found the same issue - it almost always loses all old window info after a re-balance occurs. Very rarely a window here or there is captured and handles correctly, but mostly not.
There simply must be something I'm missing here, as I expect this to be easily solvable - it's not a complicated issue. I also tried an alternative approach where I use a timer task to trigger the removal of windows using the delta()
method and just tried to make sure that each worker stored in a table the timestamp of the last window checked. That way if it restarted or re-balanced it knew where to pick up from. But it often resulted in the worker starting up with an old timestamp so it removed windows that it had already dealt with resulting in negative counts. I'd rather miss one or 2 counts than double remove them, but this is all very annoying. I'm probably not fully appreciating the partitioning and the massive role that plays in this.
Here's an example output from 2 workers. The first table is the last output of a worker that was closed using ctrl+c
, while the second table is the output of the other worker well after the time at which all values should have returned to zero (0). Those that did return to zero were originally hosted on that worker, so they appear to have correctly noticed the window closing, while those ported over from the other worker have lost that information. Surely the window information is still there and available? Unless I'm doing something wrong, but it doesn't matter whether we us memory or rocksdb - the result is the same.
┌Alarm Counts──────┐
│ Key │ Value │
├──────────┼───────┤
│ alarm_2 │ 2 │
│ alarm_25 │ 4 │
│ alarm_42 │ 3 │
│ alarm_14 │ 5 │
│ alarm_19 │ 8 │
│ alarm_30 │ 6 │
│ alarm_36 │ 3 │
│ alarm_40 │ 4 │
│ alarm_41 │ 3 │
│ alarm_11 │ 2 │
│ alarm_24 │ 5 │
│ alarm_26 │ 3 │
│ alarm_3 │ 5 │
│ alarm_33 │ 5 │
│ alarm_12 │ 4 │
│ alarm_15 │ 4 │
│ alarm_27 │ 3 │
│ alarm_48 │ 6 │
│ alarm_7 │ 5 │
└──────────┴───────┘
┌Alarm Counts──────┐
│ Key │ Value │
├──────────┼───────┤
│ alarm_0 │ 0 │
│ alarm_16 │ 0 │
│ alarm_18 │ 0 │
│ alarm_20 │ 0 │
│ alarm_22 │ 0 │
│ alarm_23 │ 0 │
│ alarm_29 │ 0 │
│ alarm_31 │ 0 │
│ alarm_34 │ 0 │
│ alarm_49 │ 0 │
│ alarm_5 │ 0 │
│ alarm_21 │ 0 │
│ alarm_28 │ 0 │
│ alarm_32 │ 0 │
│ alarm_38 │ 0 │
│ alarm_4 │ 0 │
│ alarm_10 │ 0 │
│ alarm_37 │ 0 │
│ alarm_8 │ 0 │
│ alarm_1 │ 0 │
│ alarm_13 │ 0 │
│ alarm_35 │ 0 │
│ alarm_39 │ 0 │
│ alarm_43 │ 0 │
│ alarm_44 │ 0 │
│ alarm_45 │ 0 │
│ alarm_46 │ 0 │
│ alarm_47 │ 0 │
│ alarm_6 │ 0 │
│ alarm_9 │ 0 │
│ alarm_2 │ 2 │
│ alarm_25 │ 4 │
│ alarm_42 │ 3 │
│ alarm_14 │ 5 │
│ alarm_19 │ 8 │
│ alarm_30 │ 6 │
│ alarm_36 │ 3 │
│ alarm_40 │ 4 │
│ alarm_41 │ 3 │
│ alarm_11 │ 2 │
│ alarm_24 │ 5 │
│ alarm_26 │ 3 │
│ alarm_3 │ 5 │
│ alarm_33 │ 5 │
│ alarm_12 │ 4 │
│ alarm_15 │ 4 │
│ alarm_27 │ 3 │
│ alarm_48 │ 6 │
│ alarm_7 │ 5 │
└──────────┴───────┘
Just as an addition, there really isn't any other method that can be used to perform this calculation accurately.
- Using a timer decorator to trigger the removal of old windows won't cater to a re-balance as the calculation may cross that period and be missed
- Recording the times that different workers last performed the check won't help as they will end up being re-partitioned and as a result you'll never know whether an alarm in its new partitions has already been accounted for
There's really only one option available, which is to notice when a window closes and appropriately handle the data in the closed window. Using hopping windows is an option, but considering we're looking to aggregate 3 months worth of data at 1 minute increments, that's a giant number of windows with duplicate data that just seems ridiculous. Plus we will never have the active count results until the window closes using this method. Again, that's a 3 month wait, so useless.
Hey @Roman1us, did you mention that you had a solution where you make use of the on_window_close
method that is robust?
I see your pull is now merged, so it should be present in the master, but is it more that you use the use_partitioner
method. I can't find any explanation on what the use_partitioner
method does or how to invoke it.
I did note by printing the partitions assigned that the table changelog partitions are different to the topic and table partitions - there are extra ones.
For anyone coming to this issue, it appears similar to this issue where they
suspect that
self._partition_timestamp_keys
is only kept in-memory. When workers restarts and loads state from rocksdb from persistent storage, these restored values will not re-trigger theon_key_set callback