arroyo icon indicating copy to clipboard operation
arroyo copied to clipboard

Support Joining windowed and non-windowed datasets

Open jacksonrnewhouse opened this issue 2 years ago • 2 comments

We'd like to support joins between a windowed aggregate and a non-windowed stream. This would allow us to run Nexmark Query 6. The Arroyo version of the query is below. Ideally we'd also inspect the WHERE clause in order to not keep data around after the window has passed, as the only data from B1 we care about is that within active windows.

WITH 
auction as (
    SELECT auction.category as category, 
        auction.datetime as datetime, 
        auction.expires as expires,
        auction.id as id 
    FROM nexmark where auction is not null),
bid as (
    SELECT bid.auction as auction,
        bid.bidder as bidder, 
        bid.extra as extra,
        bid.datetime as datetime,
         bid.price as price
    FROM nexmark  where bid is not null)

SELECT B.auction, B.price, B.bidder, B.dateTime, B.extra
from bid B
JOIN (
  SELECT MAX(B1.price) AS maxprice, tumble(INTERVAL '10' SECOND) as window
  FROM bid B1
  GROUP BY 2
) B1
ON B.price = B1.maxprice
WHERE B.dateTime BETWEEN B1.window.start_time AND B1.window.end_time;

jacksonrnewhouse avatar Jun 08 '23 17:06 jacksonrnewhouse

This is the DAG produced by the query

2023-08-31T22:56:30.082233Z  INFO arroyo_controller::compiler: digraph {
    0 [ label = "nexmark_0:Nexmark<10 eps>" ]
    1 [ label = "watermark_1:Watermark" ]
    2 [ label = "fused_9:expression<sql_fused<value_project,filter,value_project,value_project,value_project,key_project>:OptionalRecord>" ]
    3 [ label = "tumbling_window_two_phase_aggregator_3:TumblingWindowAggregator<TumblingWindow(10s)>" ]
    4 [ label = "fused_4:expression<sql_fused<filter,value_project,value_project,key_project>:OptionalRecord>" ]
    5 [ label = "join_pair_merge_2:expression<api_fused<join_merge,sql_fused<filter,value_project,value_project>>:OptionalRecord>" ]
    6 [ label = "sink_web_6:WebSink" ]
    7 [ label = "join_with_expiration_7:JoinWithExpiration<left_expire: 86400s, right_expire: 86400s, join_type: Inner>" ]
    8 [ label = "fused_8:expression<sql_fused<value_project,filter,value_project,value_project,value_project,key_project>:OptionalRecord>" ]
    0 -> 1 [ label = "() → arroyo_types :: nexmark :: Event" ]
    2 -> 7 [ label = "generated_struct_3484707784589493356 -left→ generated_struct_15490570238723732322" ]
    4 -> 7 [ label = "generated_struct_3484707784589493356 -right→ generated_struct_16745226037119149261" ]
    1 -> 8 [ label = "() → arroyo_types :: nexmark :: Event" ]
    7 -> 5 [ label = "generated_struct_3484707784589493356 → (generated_struct_15490570238723732322 , generated_struct_16745226037119149261)" ]
    5 -> 6 [ label = "() → generated_struct_6614185072020766535" ]
    8 -> 3 [ label = "generated_struct_17942395924573474124 ⤨ generated_struct_17349934388688571038" ]
    1 -> 2 [ label = "() → arroyo_types :: nexmark :: Event" ]
    3 -> 4 [ label = "generated_struct_17942395924573474124 → generated_struct_18028448699472152953" ]
    ```
    
No output is produced

edmondop avatar Aug 31 '23 22:08 edmondop

Screenshot 2023-08-31 at 3 58 33 PM

edmondop avatar Aug 31 '23 22:08 edmondop