scotty-window-processor
scotty-window-processor copied to clipboard
No data for singleton aggregations?
What is the expected behavior when only a single input occurs within a window? We're trying to run a simple sum aggregation over some events with long-tail of infrequent events (and a smaller number of high frequency events). We boiled it down to a test, to run in a test harness (the aggregation function simply sums the count, and takes the max of timestamps - omitted for simplicity, because the function itself doesn't appear to matter for this).
ScottyWindowOperator<String, UserEventCounter, UserEventCounter> keyedScottyWindowOperator = ...
UserEventCounter element1 = new UserEventCounter();
element1.userKey = "user1";
element1.timestamp = 1;
element1.count = 4;
UserEventCounter element2 = new UserEventCounter();
element2.userKey = "user1";
element2.timestamp = 16;
element2.count = 5;
UserEventCounter element3 = new UserEventCounter();
element3.userKey = "user1";
element3.timestamp = 33;
element3.count = 6;
testHarness.processElement(element1, 1);
testHarness.processWatermark(11);
testHarness.processElement(element2, 16);
testHarness.processWatermark(25);
testHarness.processWatermark(30);
testHarness.processElement(element3, 33);
testHarness.processWatermark(35);
testHarness.processWatermark(40);
testHarness.processWatermark(45);
testHarness.processWatermark(50);
We modified ScottyWindowOperator to add some debugging prints (value in processElement, and the AggregateWindow, before filtering on aggregation.hasValue()
in processWatermarks, and see the following sequence:
processElement|K user1 W@ 1| = UserEventCounter(userKey=user1, timestamp=1, count=4)
processElement|K user1 W@ 16| = UserEventCounter(userKey=user1, timestamp=16, count=5)
processWatermark|user1 @ 11| = WindowResult(Time,0-10,[ScottySumUserEventCounter->UserEventCounter(userKey=user1, timestamp=1, count=4)])
processElement|K user1 W@ 33| = UserEventCounter(userKey=user1, timestamp=33, count=6)
processWatermark|user1 @ 25| = WindowResult(Time,15-25,[ScottySumUserEventCounter->])
processWatermark|user1 @ 25| = WindowResult(Time,10-20,[ScottySumUserEventCounter->])
processWatermark|user1 @ 25| = WindowResult(Time,5-15,[ScottySumUserEventCounter->])
Observations:
- The only element that's actually emitted is
WindowResult(Time,0-10,[ScottySumUserEventCounter->UserEventCounter(userKey=user1, timestamp=1, count=4)])
- The three windows at the end aren't actually emitted, but would presumably be the ones containing element2, above, if they were.
- Notably, element 3 never even shows up in processWatermark.
Does Scotty, as implemented, only work if you have consistent (non-sporadic) stream of events? Is there any workaround if you have something with keys that only receive sporadic traffic like this?
Hi @jelos98, thank you for your request and detailed explanation in the ticket. We will look into the issue and get back to you as soon as possible.