WIP:[core] Support async refresh for PrimaryKeyPartialLookupTable
Purpose
Linked issue: close #3386
Support async refresh for PrimaryKeyPartialLookupTable. Async refresh files generated by new snapshots which are intervened with expired cached files. The total size of new generated lookup during one refresh will not greatly exceed the total size of expired lookup files.
Tests
- Added UT LookupTableTest#testPartialLookupTableRefreshAsync
- Added UT LookupLevelsTest#testRefreshFiles
API and Format
Documentation
@FangYongs @JingsongLi Would you kindly review this when you have time?
- @Aitozi
CC @liming30 too
The purpose of introducing asynchronous refresh in #3297 is to solve the problem that local data refresh will block lookup. Is this issue also trying to solve the same problem? PrimaryKeyPartialLookupTable.refresh only refreshes metadata, and it seems that it cannot solve the problem of synchronous refresh of local data.
The purpose of introducing asynchronous refresh in #3297 is to solve the problem that local data refresh will block
lookup. Is this issue also trying to solve the same problem?PrimaryKeyPartialLookupTable.refreshonly refreshes metadata, and it seems that it cannot solve the problem of synchronous refresh of local data.
@liming30 Thx for reply. I see your point. Maybe I should split this task into two tasks: 1, Make the refreshAsync options also work for PrimaryKeyPartialLookupTable; 2, Improve the refresh behavior for LocalQueryExecutor, create local LookupFile during refresh according to the before and after DataFileMetas in a DataSplit. WDYT?
Hi @liming30 @Aitozi, after second thought I think AsyncRefreshLookupTable should not handle the consistence issue between asyncRefresh operation and get operation for the time being. This can be resolved inside PrimaryKeyPartialLookupTable and FullCacheLookupTable. WDYT?
I overall think that if there is a solution to the data file loading issue, then it would be meaningful.
But the current code changes are only asynchronous metadata loading, which doesn't seem to have much value.
I overall think that if there is a solution to the data file loading issue, then it would be meaningful.
But the current code changes are only asynchronous metadata loading, which doesn't seem to have much value.
Yes, I can also create new lookup files asynchronously if they are overlapped with previous evicted files.
cc: @JingsongLi @Aitozi @yunfengzhou-hub for review
Before async refresh feature enabled:
After async refresh feature enabled:
This picture is more clear to show the benefits of async refresh for lookup file.
Reproduce procedure:
---- Create Dim Table ----
use catalog paimon; use paimon_test;
CREATE TABLE dim_orders ( 'order_id' INT, 'order_name' STRING, order_product_id' INT, 'order_customer_id' INT, 'order_status' STRING, 'create_date' TIMESTAMP, 'create_ts' INT, PRIMARY KEY ('order_id') NOT ENFORCED ) WITH ( 'bucket' = '20', 'bucket-key' = 'order_id' );
---- Upsert Dim Table ----
CREATE TABLE datagen_source ( 'order_id' INT, 'order_name' STRING, 'order_product_id' INT, 'order_customer_id' INT, 'order_status' STRING ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '50000', 'fields.order_name.length' = '10', 'fields.order_status.length' = '5', 'fields.order_id.min' = '1', 'fields.order_id.max' = '100000000', 'fields.order_product_id.min' = '1', 'fields.order_product_id.max' = '1000000', 'fields.order_customer_id.min' = '1', 'fields.order_customer_id.max' = '1000000' );
CREATE VIEW temp_source AS SELECT order_id, order_name, order_product_id, order_customer_id, order_status, CURRENT_TIMESTAMP AS create_date, CAST(UNIX_TIMESTAMP() AS INT) AS create_ts FROM datagen_source;
INSERT INTO 'paimon'.'paimon_test'.'dim_orders' SELECT * FROM temp_source;
---- Lookup Join Dim Table ----
CREATE TABLE 'datagen_source' ( 'product_id' INTEGER, 'product_name' STRING, 'product_category_id' INTEGER, 'product_order_id' INTEGER, 'product_status' STRING, 'create_date' AS 'proctime'() ) WITH ( 'fields.product_category_id.min' = '1', 'connector' = 'datagen', 'fields.product_category_id.max' = '1000000', 'fields.product_name.length' = '10', 'fields.product_status.length' = '5', 'rows-per-second' = '48000', 'fields.product_order_id.max' = '200000000', 'fields.product_order_id.min' = '1' );
CREATE TABLE 'print_sink' ( 'product_id' INTEGER, 'product_name' STRING, 'product_category_id' INTEGER, 'product_order_id' INTEGER, 'product_status' STRING, 'create_date' TIMESTAMP, 'order_name' STRING, 'orders_customer_id' INTEGER, 'order_status' STRING ) WITH ('connector' = 'print');
CREATE VIEW 'trade_orde_view' AS SELECT 'gen'.'product_id', 'gen'.'product_name', 'gen'.'product_category_id', 'gen'.'product_order_id', 'gen'.'product_status', 'gen'.'create_date', 'orders'.'order_name', 'orders'.'order_customer_id', 'order_status' FROM 'datagen_source' AS 'gen' INNER JOIN 'paimon'.'paimon_test'.'dim_orders' /*+ 'OPTIONS'('lookup.cache' = 'AUTO', 'lookup.refresh.async' = 'true', 'lookup.cache-max-disk-size' = '40 gb') */ FOR SYSTEM_TIME AS OF 'gen'.'create_date' AS 'orders' ON 'gen'.'product_order_id' = 'orders'.'order_id';
INSERT INTO 'print_sink' ( SELECT * FROM 'trade_orde_view' WHERE 'order_name' IS NOT NULL );
cc: @JingsongLi @FangYongs for review.