kaskada icon indicating copy to clipboard operation
kaskada copied to clipboard

bug: weird behavior with collect continuity

Open jordanrfrazier opened this issue 1 year ago • 3 comments

Description Test that splits up each index because we can’t print structs in the csv results:

#[tokio::test]
async fn test_collect_struct_since_hourly() {
    // TODO: The results here are weird, because `collect` is latched. I don't think I'd expect
    // the results we have here, but it's possible they're technically in line with what we expect
    // given our continuity rules. We should revisit this.
    insta::assert_snapshot!(QueryFixture::new("{ 
        b: Collect.b,
        f0: ({b: Collect.b} | collect(max=10, window=since(hourly())) | index(0)).b | when(is_valid($input)),
        f1: ({b: Collect.b} | collect(max=10, window=since(hourly())) | index(1)).b | when(is_valid($input)),
        f2: ({b: Collect.b} | collect(max=10, window=since(hourly())) | index(2)).b | when(is_valid($input)),
        f3: ({b: Collect.b} | collect(max=10, window=since(hourly())) | index(3)).b | when(is_valid($input)),
        f4: ({b: Collect.b} | collect(max=10, window=since(hourly())) | index(4)).b | when(is_valid($input))
    }").run_to_csv(&collect_data_fixture().await).await.unwrap(), @r###"
    _time,_subsort,_key_hash,_key,b,f0,f1,f2,f3,f4
    1996-12-20T00:39:57.000000000,9223372036854775808,12960666915911099378,A,true,true,,,,
    1996-12-20T00:40:57.000000000,9223372036854775808,12960666915911099378,A,false,true,false,,,
    1996-12-20T00:41:57.000000000,9223372036854775808,12960666915911099378,A,,true,false,,,
    1996-12-20T00:42:00.000000000,9223372036854775808,12960666915911099378,A,false,true,false,,false,
    1996-12-20T00:42:57.000000000,9223372036854775808,12960666915911099378,A,true,true,false,,false,true
    1996-12-20T00:43:57.000000000,9223372036854775808,12960666915911099378,A,true,true,false,,false,true
    1996-12-20T01:00:00.000000000,18446744073709551615,12960666915911099378,A,,true,false,,false,true
    1996-12-21T00:40:57.000000000,9223372036854775808,2867199309159137213,B,false,false,,,,
    1996-12-21T00:41:57.000000000,9223372036854775808,2867199309159137213,B,,false,,,,
    1996-12-21T00:42:57.000000000,9223372036854775808,2867199309159137213,B,true,false,,true,,
    1996-12-21T00:43:57.000000000,9223372036854775808,2867199309159137213,B,false,false,,true,false,
    1996-12-21T00:44:57.000000000,9223372036854775808,2867199309159137213,B,true,false,,true,false,true
    1996-12-21T01:00:00.000000000,18446744073709551615,2867199309159137213,B,,false,,true,false,true
    1996-12-21T01:44:57.000000000,9223372036854775808,2867199309159137213,B,true,true,,true,false,true
    1996-12-21T02:00:00.000000000,18446744073709551615,2867199309159137213,B,,true,,true,false,true
    1996-12-22T00:44:57.000000000,9223372036854775808,2521269998124177631,C,true,true,,,,
    1996-12-22T00:45:57.000000000,9223372036854775808,2521269998124177631,C,true,true,true,,,
    1996-12-22T00:46:57.000000000,9223372036854775808,2521269998124177631,C,true,true,true,true,,
    1996-12-22T00:47:57.000000000,9223372036854775808,2521269998124177631,C,true,true,true,true,true,
    "###);
}

But the problem is that now that collect is as-of/continuous , we’re storing the last non-null value for each individual index.

It may be “technically correct” based on our continuity rules. See the compute plan — each b field ref is going to a separate select and merge because of the is_valid, so each merge is keeping the latched state.

Problem:

Specifically the two lines:

    1996-12-21T01:44:57.000000000,9223372036854775808,2867199309159137213,B,true,true,,true,false,true
    1996-12-21T02:00:00.000000000,18446744073709551615,2867199309159137213,B,,true,,true,false,true

should have nulls for the last 3 columns, I'd expect. The problem is that the last non-null values for each are being saved, as if it were pushed to a last (which, it almost technically is, since it's going in latched spread).

jordanrfrazier avatar Aug 10 '23 05:08 jordanrfrazier

I agree it's weird. I'm somewhat suspicious that it may be caused by the multiple merges. Some thoughts / questions:

  1. Why would you expect the last 3 columns to be null? I'd expect the last 4 or 5 to be null. Rationale: the collect is "since(hourly())", which means it should reset at 2:00", right? Or is this because it's outputting and inclusive of 2:00? IN that case, I guess it makes sense -- the 3 values since (and including) 1:00 were [null, true, null] (although it raises the question of why 1:00 would be included in the collection up to 1:00 if 2:00 is included in the collection starting at 2:00 -- seems like every hour would be included twice).
  2. Does the weirdness go away if you just do b and f0? What about b and f3 or f4? Basically, trying to determine if the issue goes away if there is only one merge.
  3. Similarly, can you reproduce with last and/or some combination of shifts and lasts to get different domains (multiple merges)? Trying to see if we can rule out collect here.

bjchambers avatar Aug 10 '23 05:08 bjchambers

final_with_filter

This issue arose from attempting to hack a fix for trailing windows by shifting the input forward, merging that with the original, feeding that to the collect, then filtering on original non null input. The above plan shows the query with hack:

        kd.record(
            {
                "m": m,
                "collect_m": m.collect(
                    max=None, window=kd.windows.Trailing(timedelta(seconds=1))
                ),
            }

-------------------------------------------------------------------------------------------

        is_input = input.is_not_null()
        shift_by = window.duration
        input_shift = input.shift_by(shift_by)
        trailing_ns = int(window.duration.total_seconds() * 1e9)
        input = record({"input": input, "input_shift": input_shift}).col("input")

        # HACK: Use null predicate and number of nanoseconds to encode trailing windows.
        collect = Timestream._call(op, input, *args, None, trailing_ns)
        return collect.filter(is_input)
     

And the weird behavior we see is that collect in operation 2 is correctly clearing the buffers and producing an empty, non-null list. That output is sent to a select, then to the final merge (operation4), where it is correctly latching values. However, the select after the collect is filtering on the is_valid(input) . And because the input was null for several rows, we’re not sending the empty list rows to the final merge , so it’s not latching the correct new state (an empty list).

This behavior is strange in a sense because at first glance it may seem like adding a filter after any aggregation would cause it to not obey interpolation rules further downstream. However, the reason this is okay and expected is that when (or select / filter) always produces discrete values.

Thus, given the (pseudocode) example:

Foo.sum() -> [Hour 1: 10, Hour 2: 20, Hour 3: 30, Hour 4: 40]
Foo.sum(window=Since(hourly())).filter(on_odd_hours) -> [Hour 1: 10, Hour 3: 30]

in { sum, filtered }

->  
[ 
  { sum: 10, filtered: 10}
  { sum: 20, filtered: null}
  { sum: 30, filtered: 30}
  { sum: 40, filtered: null}
]

jordanrfrazier avatar Aug 15 '23 16:08 jordanrfrazier

This is a behavior that is likely correct, but possible to have subtle and meaningful impact on complex queries. We should discuss ways to alleviate this risk

jordanrfrazier avatar Aug 15 '23 16:08 jordanrfrazier