materialize icon indicating copy to clipboard operation
materialize copied to clipboard

Convert not in statements to use antijoins

Open ruchirK opened this issue 6 years ago • 4 comments

Right now NOT IN statements use a cross join which is not very performant - lets change that to use an antijoin instead and add some tests to stress the new implementation

ruchirK avatar Dec 02 '19 20:12 ruchirK

I think this is either done, or has advanced to "implemented, but not bulletproof". cc @justinj

frankmcsherry avatar Sep 03 '20 14:09 frankmcsherry

@cuongdo and @justinj I think we can close this?

ruchirK avatar Sep 23 '20 13:09 ruchirK

Recently checked and TPCH Q16, which contains a NOT IN, ends up with this fragment. Arguably, this would be fine if the input keys were known to be non-null, and maybe they should be but aren't in my set-up:

%7 =
| Join %5 %6
| | implementation = Differential %6 %5.()
| Filter ((#0) IS NULL OR (#1) IS NULL OR (#0 = #1))
| Project (#0)
| Distinct group=(#0)
| Negate

frankmcsherry avatar Sep 19 '22 18:09 frankmcsherry

Observed by @wangandi that this would also be addressed by https://github.com/MaterializeInc/materialize/issues/4229#issuecomment-1251400101

frankmcsherry avatar Sep 19 '22 18:09 frankmcsherry

The most recent plan of TPC-H Q16 includes an antijoin pattern:

 Explained Query:                                                                                                                                                                     +
   Finish order_by=[#3 desc nulls_first, #0 asc nulls_last, #1 asc nulls_last, #2 asc nulls_last] output=[#0..=#3]                                                                    +
     Return                                                                                                                                                                           +
       Reduce group_by=[#1..=#3] aggregates=[count(distinct #0)]                                                                                                                      +
         Project (#0..=#3)                                                                                                                                                            +
           Join on=(#0 = #4) type=differential                                                                                                                                        +
             ArrangeBy keys=[[#0]]                                                                                                                                                    +
               Get l0                                                                                                                                                                 +
             ArrangeBy keys=[[#0]]                                                                                                                                                    +
               Union                                                                                                                                                                  +
                 Negate                                                                                                                                                               +
                   Project (#0)                                                                                                                                                       +
                     Join on=(#0 = #1) type=differential                                                                                                                              +
                       ArrangeBy keys=[[#0]]                                                                                                                                          +
                         Get l1                                                                                                                                                       +
                       ArrangeBy keys=[[#0]]                                                                                                                                          +
                         Project (#0)                                                                                                                                                 +
                           Filter "%Customer%Complaints%" ~~(#6)                                                                                                                      +
                             Get materialize.public.supplier                                                                                                                          +
                 Get l1                                                                                                                                                               +
     With                                                                                                                                                                             +
       cte l1 =                                                                                                                                                                       +
         Distinct group_by=[#0]                                                                                                                                                       +
           Project (#0)                                                                                                                                                               +
             Get l0                                                                                                                                                                   +
       cte l0 =                                                                                                                                                                       +
         Project (#1, #3..=#5)                                                                                                                                                        +
           Join on=(#0 = #2) type=differential                                                                                                                                        +
             ArrangeBy keys=[[#0]]                                                                                                                                                    +
               Project (#0, #1)                                                                                                                                                       +
                 Get materialize.public.partsupp                                                                                                                                      +
             ArrangeBy keys=[[#0]]                                                                                                                                                    +
               Project (#0, #3..=#5)                                                                                                                                                  +
                 Filter (#3 != "Brand#45") AND NOT("MEDIUM POLISHED%" ~~(#4)) AND ((#5 = 3) OR (#5 = 9) OR (#5 = 14) OR (#5 = 19) OR (#5 = 23) OR (#5 = 36) OR (#5 = 45) OR (#5 = 49))+
                   Get materialize.public.part                                                                                                                                        +
                                                                                                                                                                                      +
 Source materialize.public.part                                                                                                                                                       +
   filter=((#3 != "Brand#45") AND NOT("MEDIUM POLISHED%" ~~(#4)) AND ((#5 = 3) OR (#5 = 9) OR (#5 = 14) OR (#5 = 19) OR (#5 = 23) OR (#5 = 36) OR (#5 = 45) OR (#5 = 49)))            +
 Source materialize.public.supplier                                                                                                                                                   +
   filter=("%Customer%Complaints%" ~~(#6))                                                                                                                                            +

My setup was created with the TPC-H load generator, so columns are declared as not null.

Perhaps it is OK to close this issue?

vmarcos avatar Mar 09 '23 16:03 vmarcos

I would say don't close it yet. The problem is that if the columns are not declared NOT NULL, then the plan still has the cross join:

Explained Query:
  Finish order_by=[#3 desc nulls_first, #0 asc nulls_last, #1 asc nulls_last, #2 asc nulls_last] output=[#0..=#3]
    Return
      Reduce group_by=[#1..=#3] aggregates=[count(distinct #0)]
        Project (#0..=#3)
          Join on=(#0 = #4) type=differential
            ArrangeBy keys=[[#0]]
              Get l0
            ArrangeBy keys=[[#0]]
              Union
                Negate
                  Distinct group_by=[#0]
                    Project (#0)
                      Filter ((#0) IS NULL OR (#1) IS NULL OR (#0 = #1))
                        CrossJoin type=differential   <--------------------------------------------------------
                          ArrangeBy keys=[[]]
                            Get l1
                          ArrangeBy keys=[[]]
                            Project (#0)
                              Filter "%Customer%Complaints%" ~~(#6)
                                Get materialize.public.supplier
                Get l1
    With
      cte l1 =
        Distinct group_by=[#0]
          Project (#0)
            Get l0
      cte l0 =
        Project (#1, #3..=#5)
          Join on=(#0 = #2) type=differential
            ArrangeBy keys=[[#0]]
              Project (#0, #1)
                Filter (#0) IS NOT NULL
                  Get materialize.public.partsupp
            ArrangeBy keys=[[#0]]
              Project (#0, #3..=#5)
                Filter (#3 != "Brand#45") AND (#0) IS NOT NULL AND NOT("MEDIUM POLISHED%" ~~(#4)) AND ((#5 = 3) OR (#5 = 9) OR (#5 = 14) OR (#5 = 19) OR (#5 = 23) OR (#5 = 36) OR (#5 = 45) OR (#5 = 49))
                  Get materialize.public.part

Source materialize.public.part
  filter=((#0) IS NOT NULL AND (#3 != "Brand#45") AND NOT("MEDIUM POLISHED%" ~~(#4)) AND ((#5 = 3) OR (#5 = 9) OR (#5 = 14) OR (#5 = 19) OR (#5 = 23) OR (#5 = 36) OR (#5 = 45) OR (#5 = 49)))
Source materialize.public.partsupp
  filter=((#0) IS NOT NULL)
Source materialize.public.supplier
  filter=("%Customer%Complaints%" ~~(#6))

I added the issue to my decorrelation issues collection:

  • https://github.com/MaterializeInc/materialize/issues/5213
  • https://github.com/MaterializeInc/materialize/issues/14849
  • https://github.com/MaterializeInc/materialize/issues/7682
  • https://github.com/MaterializeInc/materialize/issues/7412
  • https://github.com/MaterializeInc/materialize/issues/5213
  • https://github.com/MaterializeInc/materialize/issues/744
  • https://github.com/MaterializeInc/materialize/issues/1137

I'm hoping to tackle these somewhere around Q3-Q4.

Edit: I created a tracking issue for the above list: https://github.com/MaterializeInc/materialize/issues/18352

ggevay avatar Mar 23 '23 08:03 ggevay

Simpler repro:

CREATE TABLE t1 (
  a int,
  b int
);

CREATE TABLE t2 (
  a int,
  b int
);

explain
select * from t1 where t1.a NOT IN (select a from t2);

plan:

                            Optimized Plan                            
----------------------------------------------------------------------
 Explained Query:                                                    +
   Return                                                            +
     Project (#0, #1)                                                +
       Join on=(#0 = #2) type=differential                           +
         ArrangeBy keys=[[#0]]                                       +
           Get materialize.public.t1                                 +
         ArrangeBy keys=[[#0]]                                       +
           Union                                                     +
             Negate                                                  +
               Distinct group_by=[#0]                                +
                 Project (#0)                                        +
                   Filter ((#0) IS NULL OR (#1) IS NULL OR (#0 = #1))+
                     CrossJoin type=differential                     +
                       ArrangeBy keys=[[]]                           +
                         Get l0                                      +
                       ArrangeBy keys=[[]]                           +
                         Project (#0)                                +
                           Get materialize.public.t2                 +
             Get l0                                                  +
   With                                                              +
     cte l0 =                                                        +
       Distinct group_by=[#0]                                        +
         Project (#0)                                                +
           Get materialize.public.t1                                 +
 
(1 row)

ggevay avatar Aug 10 '23 13:08 ggevay

A customer ran into this recently: https://materializeinc.slack.com/archives/C02PPB50ZHS/p1691659716977619

(which Seth also mentioned here: https://github.com/MaterializeInc/materialize/issues/21170)

ggevay avatar Aug 10 '23 13:08 ggevay

I dare say we should consider adding an HIR transform as a quick fix.

ggevay avatar Aug 10 '23 15:08 ggevay

If we had efficient broadcast joins, then we could look for the

               Distinct group_by=[#0]                                +
                 Project (#0)                                        +
                   Filter ((#0) IS NULL OR (#1) IS NULL OR (#0 = #1))+
                     CrossJoin type=differential                     +

pattern, and turn it into a Union of 3 things:

  • an equi join with the (#0 = #1) and then a Filter that none of them are null.
  • Nulls from the left input Distincted, and then crossed with the right input.
  • Nulls from the right input Distincted, and then crossed with the left input.

But the two new crosses need efficient broadcast joins, because they would be crossing with a 0 or 1 element collection. (Or we could just hack a broadcast join with the hashing + generate_series trick.)

Edit: We don't need to include the Distinct in the pattern. As Frank observes, we can simply include the negation of the earlier predicate to avoid double-counting.

Edit 2: Even without broadcast joins, the above transformation would improve the situation, since there wouldn't be a quadratic blowup anymore. The big input would still be skewed to one worker, but that's a smaller problem than the quadratic blowup.

ggevay avatar Aug 14 '23 16:08 ggevay

Workarounds (copied from Slack):

An other thing that makes the CrossJoin disappear is to filter out nulls before feeding into the NOT IN. E.g. instead of

CREATE TABLE t1 (a int, b int);
CREATE TABLE t2 (a int, b int);

select * from t1 where t1.a NOT IN (select a from t2);

rewrite to

select * from t1 where t1.a IS NOT NULL AND t1.a NOT IN (select a from t2 where t2.a IS NOT NULL);

Similarly to the NOT EXISTS rewrite, this is not an equivalent rewrite, but this might be ok for some users.

For reference, this is an example of the NOT EXISTS rewrite:

select * from t1 where NOT EXISTS (select a from t2 where a = t1.a);

ggevay avatar Aug 14 '23 16:08 ggevay

One more situation where a NOT IN leads to a cross join:

WITH leaves AS (
    SELECT *
    FROM mz_internal.mz_dataflow_addresses AS outer
    WHERE
        outer.address NOT IN (
            SELECT inner.address[:list_length(outer.address)]
            FROM mz_internal.mz_dataflow_addresses AS inner
            WHERE inner.id != outer.id
        )
)

SELECT
    mdo.id,
    mdo.name,
    mdod.dataflow_name,
    mse.elapsed_ns / 1000 * '1 MICROSECONDS'::interval AS elapsed_time
FROM mz_internal.mz_scheduling_elapsed AS mse,
    mz_internal.mz_dataflow_operators AS mdo,
    mz_internal.mz_dataflow_operator_dataflows AS mdod
WHERE mse.id = mdo.id AND mdo.id = mdod.id AND mdo.id IN (SELECT id FROM leaves)
ORDER BY elapsed_ns DESC;

(This is one of our suggested troubleshooting queries.) Here, the cross looks a bit differently, i.e., it's not checking nulls, but has a !=:

Edit: This is actually very different from the above case:

  • This is an AND, not an OR.
  • It ends up being a cross join because the list_slice_linear(#3, 1, integer_to_bigint(list_length(#1))) is mentioning both inputs. Similarly, the inner.id != outer.id is also mentioning both inputs. So this is a correlated subquery, as opposed to the above example. And I can't think of any way to even manually decorrelate it without a cross join, so I'd say we shouldn't feel bad about this query ending up as a cross join.
                                                      Optimized Plan                                                       
---------------------------------------------------------------------------------------------------------------------------
 Explained Query:                                                                                                         +
   Finish order_by=[#1 desc nulls_first] output=[#2, #3, #7, #8]                                                          +
     Return                                                                                                               +
       Project (#0, #1, #0, #3, #0, #6, #12, #11, #14)                                                                    +
         Filter (#12) IS NOT NULL AND (0 = bigint_to_numeric(#5))                                                         +
           Map ((00:00:00.000001 * numeric_to_double((#1 / 1000))))                                                       +
             Join on=(eq(#0, #2, #4, #7, #13) AND eq(#5, #8, #10) AND #12 = list_index(#9, 1)) type=differential          +
               ArrangeBy keys=[[#0]]                                                                                      +
                 Reduce group_by=[#0] aggregates=[sum(#1)]                                                                +
                   Project (#0, #2)                                                                                       +
                     Reduce group_by=[#0, #1] aggregates=[count(*)]                                                       +
                       Get mz_internal.mz_scheduling_elapsed_raw                                                          +
               ArrangeBy keys=[[#0]]                                                                                      +
                 Project (#0, #2)                                                                                         +
                   Filter (0 = bigint_to_numeric(#1))                                                                     +
                     Get mz_internal.mz_dataflow_operators_per_worker                                                     +
               Get l2                                                                                                     +
               Get l3                                                                                                     +
               ArrangeBy keys=[[#0, #2]]                                                                                  +
                 Project (#1, #5, #6)                                                                                     +
                   Filter (#6) IS NOT NULL AND (1 = list_length(#2)) AND (0 = bigint_to_numeric(#1))                      +
                     Map (list_index(#2, 1))                                                                              +
                       Join on=(#0 = #3 AND #1 = #4) type=differential                                                    +
                         Get l3                                                                                           +
                         Get l2                                                                                           +
               ArrangeBy keys=[[#0]]                                                                                      +
                 Distinct group_by=[#0]                                                                                   +
                   Project (#0)                                                                                           +
                     Join on=(#0 = #2 AND #1 = #3) type=differential                                                      +
                       ArrangeBy keys=[[#0, #1]]                                                                          +
                         Get l0                                                                                           +
                       ArrangeBy keys=[[#0, #1]]                                                                          +
                         Union                                                                                            +
                           Negate                                                                                         +
                             Distinct group_by=[#0, #1]                                                                   +
                               Project (#0, #1)                                                                           +
                                 Filter (#0 != #2) AND (#1 = list_slice_linear(#3, 1, integer_to_bigint(list_length(#1))))+
                                   CrossJoin type=differential                                                            +
                                     ArrangeBy keys=[[]]                                                                  +
                                       Get l1                                                                             +
                                     ArrangeBy keys=[[]]                                                                  +
                                       Get l0                                                                             +
                           Get l1                                                                                         +
     With                                                                                                                 +
       cte l3 =                                                                                                           +
         ArrangeBy keys=[[#0, #1]]                                                                                        +
           Get mz_internal.mz_dataflow_addresses_per_worker                                                               +
       cte l2 =                                                                                                           +
         ArrangeBy keys=[[#0, #1]]                                                                                        +
           Get mz_internal.mz_dataflow_operators_per_worker                                                               +
       cte l1 =                                                                                                           +
         Distinct group_by=[#0, #1]                                                                                       +
           Get l0                                                                                                         +
       cte l0 =                                                                                                           +
         Project (#0, #2)                                                                                                 +
           Filter (0 = bigint_to_numeric(#1))                                                                             +
             Get mz_internal.mz_dataflow_addresses_per_worker                                                             +
                                                                                                                          +
 Used Indexes:                                                                                                            +
   - mz_internal.mz_dataflow_operators_per_worker_u1_primary_idx (differential join, differential join)                   +
   - mz_internal.mz_dataflow_addresses_per_worker_u1_primary_idx (differential join, differential join)                   +
   - mz_internal.mz_scheduling_elapsed_raw_u1_primary_idx (*** full scan ***)                                             +
 
(1 row)

ggevay avatar Aug 15 '23 09:08 ggevay

This is also happening with the NOT IN in our quickstart:

explain SELECT buyer, count(*)
FROM winning_bids
WHERE buyer NOT IN (SELECT id FROM fraud_accounts)
GROUP BY buyer
ORDER BY 2 DESC LIMIT 5;
                               Optimized Plan                                
-----------------------------------------------------------------------------
 Explained Query:                                                           +
   Finish order_by=[#1 desc nulls_first] limit=5 output=[#0, #1]            +
     Return                                                                 +
       Reduce group_by=[#0] aggregates=[count(*)]                           +
         Project (#0)                                                       +
           Join on=(#0 = #1) type=differential                              +
             ArrangeBy keys=[[#0]]                                          +
               Get l0                                                       +
             ArrangeBy keys=[[#0]]                                          +
               Union                                                        +
                 Negate                                                     +
                   Distinct group_by=[#0]                                   +
                     Project (#0)                                           +
                       Filter ((#1) IS NULL OR (#0 = #1))                   +
                         CrossJoin type=differential                        +
                           ArrangeBy keys=[[]]                              +
                             Get l1                                         +
                           ArrangeBy keys=[[]]                              +
                             ReadStorage materialize.public.fraud_accounts  +
                 Get l1                                                     +
     With                                                                   +
       cte l1 =                                                             +
         Distinct group_by=[#0]                                             +
           Get l0                                                           +
       cte l0 =                                                             +
         Project (#1)                                                       +
           TopK group_by=[#0] order_by=[#2 desc nulls_first] limit=1        +
             Project (#0, #2, #4)                                           +
               Filter (#5 < #1)                                             +
                 Join on=(#0 = #3) type=differential                        +
                   ArrangeBy keys=[[#0]]                                    +
                     Project (#0, #3)                                       +
                       Filter (mz_now() >= timestamp_tz_to_mz_timestamp(#3))+
                         ReadStorage materialize.public.auctions            +
                   ArrangeBy keys=[[#1]]                                    +
                     Project (#1..=#4)                                      +
                       ReadStorage materialize.public.bids                  +
                                                                            +
 Source materialize.public.auctions                                         +
   filter=((mz_now() >= timestamp_tz_to_mz_timestamp(#3)))                  +
 
(1 row)

ggevay avatar Sep 23 '23 10:09 ggevay

I think the next step here is to check exactly how the null checks in ((#0) IS NULL OR (#1) IS NULL OR (#0 = #1)) end up there. Instead of checking nulls there, one could have an IS NOT NULL check somewhere higher up, after the negation. Maybe there is some simple change in the lowering that would achieve this. It's also possible that they start from higher up, but PredicatePushdown pushes them there, in which case we might want some adjustment in PredicatePushdown.

ggevay avatar Sep 23 '23 11:09 ggevay