paimon icon indicating copy to clipboard operation
paimon copied to clipboard

WIP:[core] Support async refresh for PrimaryKeyPartialLookupTable

Open xiangyuf opened this issue 1 year ago • 12 comments

Purpose

Linked issue: close #3386

Support async refresh for PrimaryKeyPartialLookupTable. Async refresh files generated by new snapshots which are intervened with expired cached files. The total size of new generated lookup during one refresh will not greatly exceed the total size of expired lookup files.

Tests

  1. Added UT LookupTableTest#testPartialLookupTableRefreshAsync
  2. Added UT LookupLevelsTest#testRefreshFiles

API and Format

Documentation

xiangyuf avatar Sep 18 '24 11:09 xiangyuf

@FangYongs @JingsongLi Would you kindly review this when you have time?

xiangyuf avatar Sep 18 '24 16:09 xiangyuf

  • @Aitozi

xiangyuf avatar Sep 20 '24 13:09 xiangyuf

CC @liming30 too

JingsongLi avatar Sep 25 '24 02:09 JingsongLi

The purpose of introducing asynchronous refresh in #3297 is to solve the problem that local data refresh will block lookup. Is this issue also trying to solve the same problem? PrimaryKeyPartialLookupTable.refresh only refreshes metadata, and it seems that it cannot solve the problem of synchronous refresh of local data.

liming30 avatar Sep 25 '24 10:09 liming30

The purpose of introducing asynchronous refresh in #3297 is to solve the problem that local data refresh will block lookup. Is this issue also trying to solve the same problem? PrimaryKeyPartialLookupTable.refresh only refreshes metadata, and it seems that it cannot solve the problem of synchronous refresh of local data.

@liming30 Thx for reply. I see your point. Maybe I should split this task into two tasks: 1, Make the refreshAsync options also work for PrimaryKeyPartialLookupTable; 2, Improve the refresh behavior for LocalQueryExecutor, create local LookupFile during refresh according to the before and after DataFileMetas in a DataSplit. WDYT?

xiangyuf avatar Sep 25 '24 11:09 xiangyuf

Hi @liming30 @Aitozi, after second thought I think AsyncRefreshLookupTable should not handle the consistence issue between asyncRefresh operation and get operation for the time being. This can be resolved inside PrimaryKeyPartialLookupTable and FullCacheLookupTable. WDYT?

xiangyuf avatar Oct 08 '24 04:10 xiangyuf

I overall think that if there is a solution to the data file loading issue, then it would be meaningful.

But the current code changes are only asynchronous metadata loading, which doesn't seem to have much value.

JingsongLi avatar Apr 07 '25 03:04 JingsongLi

I overall think that if there is a solution to the data file loading issue, then it would be meaningful.

But the current code changes are only asynchronous metadata loading, which doesn't seem to have much value.

Yes, I can also create new lookup files asynchronously if they are overlapped with previous evicted files.

xiangyuf avatar Apr 14 '25 12:04 xiangyuf

cc: @JingsongLi @Aitozi @yunfengzhou-hub for review

xiangyuf avatar Apr 18 '25 12:04 xiangyuf

Before async refresh feature enabled: image

After async refresh feature enabled: image

xiangyuf avatar Apr 26 '25 04:04 xiangyuf

This picture is more clear to show the benefits of async refresh for lookup file. image

xiangyuf avatar Apr 26 '25 06:04 xiangyuf

Reproduce procedure:

---- Create Dim Table ----

use catalog paimon; use paimon_test;

CREATE TABLE dim_orders ( 'order_id' INT, 'order_name' STRING, order_product_id' INT, 'order_customer_id' INT, 'order_status' STRING, 'create_date' TIMESTAMP, 'create_ts' INT, PRIMARY KEY ('order_id') NOT ENFORCED ) WITH ( 'bucket' = '20', 'bucket-key' = 'order_id' );

---- Upsert Dim Table ----

CREATE TABLE datagen_source ( 'order_id' INT, 'order_name' STRING, 'order_product_id' INT, 'order_customer_id' INT, 'order_status' STRING ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '50000', 'fields.order_name.length' = '10', 'fields.order_status.length' = '5', 'fields.order_id.min' = '1', 'fields.order_id.max' = '100000000', 'fields.order_product_id.min' = '1', 'fields.order_product_id.max' = '1000000', 'fields.order_customer_id.min' = '1', 'fields.order_customer_id.max' = '1000000' );

CREATE VIEW temp_source AS SELECT order_id, order_name, order_product_id, order_customer_id, order_status, CURRENT_TIMESTAMP AS create_date, CAST(UNIX_TIMESTAMP() AS INT) AS create_ts FROM datagen_source;

INSERT INTO 'paimon'.'paimon_test'.'dim_orders' SELECT * FROM temp_source;

---- Lookup Join Dim Table ----

CREATE TABLE 'datagen_source' ( 'product_id' INTEGER, 'product_name' STRING, 'product_category_id' INTEGER, 'product_order_id' INTEGER, 'product_status' STRING, 'create_date' AS 'proctime'() ) WITH ( 'fields.product_category_id.min' = '1', 'connector' = 'datagen', 'fields.product_category_id.max' = '1000000', 'fields.product_name.length' = '10', 'fields.product_status.length' = '5', 'rows-per-second' = '48000', 'fields.product_order_id.max' = '200000000', 'fields.product_order_id.min' = '1' );

CREATE TABLE 'print_sink' ( 'product_id' INTEGER, 'product_name' STRING, 'product_category_id' INTEGER, 'product_order_id' INTEGER, 'product_status' STRING, 'create_date' TIMESTAMP, 'order_name' STRING, 'orders_customer_id' INTEGER, 'order_status' STRING ) WITH ('connector' = 'print');

CREATE VIEW 'trade_orde_view' AS SELECT 'gen'.'product_id', 'gen'.'product_name', 'gen'.'product_category_id', 'gen'.'product_order_id', 'gen'.'product_status', 'gen'.'create_date', 'orders'.'order_name', 'orders'.'order_customer_id', 'order_status' FROM 'datagen_source' AS 'gen' INNER JOIN 'paimon'.'paimon_test'.'dim_orders' /*+ 'OPTIONS'('lookup.cache' = 'AUTO', 'lookup.refresh.async' = 'true', 'lookup.cache-max-disk-size' = '40 gb') */ FOR SYSTEM_TIME AS OF 'gen'.'create_date' AS 'orders' ON 'gen'.'product_order_id' = 'orders'.'order_id';

INSERT INTO 'print_sink' ( SELECT * FROM 'trade_orde_view' WHERE 'order_name' IS NOT NULL );

xiangyuf avatar Apr 26 '25 06:04 xiangyuf

cc: @JingsongLi @FangYongs for review.

xiangyuf avatar May 12 '25 04:05 xiangyuf