iceberg icon indicating copy to clipboard operation
iceberg copied to clipboard

iceberg mor table execute merge very very slow

Open BsoBird opened this issue 2 years ago • 21 comments

Query engine

iceberg 1.2.1 spark 3.3.2

Question

I created two tables with the same amount of data, one MOR table and one COW table, both with nearly 400 million data and a data size of 200G. I MERGE INTO 100 million data to each table. COW table took 9 minutes to complete the execution. MOR table is currently executing close to 30 minutes has not yet finished.

Why is the MOR table so slow to execute MERGE?

Here is my execution SQL:

CREATE TABLE IF NOT EXISTS local.test.b_std_trade_4_iceberg_orc_zstd ( `uni_order_id` string, `data_from` bigint, `partner` string, `plat_code` string, `order_id` string, `uni_shop_id` string, `uni_id` string, `guide_id` string, `shop_id` string, `plat_account` string, `total_fee` double, `item_discount_fee` double, `trade_discount_fee` double, `adjust_fee` double, `post_fee` double, `discount_rate` double, `payment_no_postfee` double, `payment` double, `pay_time` string, `product_num` bigint, `order_status` string, `is_refund` string, `refund_fee` double, `insert_time` string, `created` string, `endtime` string, `modified` string, `trade_type` string, `receiver_name` string, `receiver_country` string, `receiver_state` string, `receiver_city` string, `receiver_district` string, `receiver_town` string, `receiver_address` string, `receiver_mobile` string, `trade_source` string, `delivery_type` string, `consign_time` string, `orders_num` bigint, `is_presale` bigint, `presale_status` string, `first_fee_paytime` string, `last_fee_paytime` string, `first_paid_fee` double, `tenant` string, `tidb_modified` string, `step_paid_fee` double, `seller_flag` string, `is_used_store_card` BIGINT, `store_card_used` DOUBLE, `store_card_basic_used` DOUBLE, `store_card_expand_used` DOUBLE, `order_promotion_num` BIGINT, `item_promotion_num` BIGINT, `buyer_remark` string, `seller_remark` string, trade_business_type string )TBLPROPERTIES ('read.orc.vectorization.enabled'='true','write.format.default'='orc','write.orc.bloom.filter.columns'='tenant,shop_id,plat_code,order_status,payment,pay_time,created','kyuubi.zorder.enabled'='true','kyuubi.zorder.cols'='tenant,shop_id,plat_code,order_status,payment,pay_time,created','write.orc.compression-codec'='zstd') STORED AS iceberg;

merge into local.test.b_std_trade_4_iceberg_orc_zstd t using( select `uni_order_id`, `data_from`, `partner`, `plat_code`, `order_id`, `uni_shop_id`, `uni_id`, `guide_id`, `shop_id`, `plat_account`, `total_fee`, `item_discount_fee`, `trade_discount_fee`, `adjust_fee`, `post_fee`, `discount_rate`, `payment_no_postfee`, `payment`, `pay_time`, `product_num`, `order_status`, `is_refund`, `refund_fee`, `insert_time`, `created`, `endtime`, `modified`, `trade_type`, `receiver_name`, `receiver_country`, `receiver_state`, `receiver_city`, `receiver_district`, `receiver_town`, `receiver_address`, `receiver_mobile`, `trade_source`, `delivery_type`, `consign_time`, `orders_num`, `is_presale`, `presale_status`, `first_fee_paytime`, `last_fee_paytime`, `first_paid_fee`, `tenant`, `tidb_modified`, `step_paid_fee`, `seller_flag`, `is_used_store_card`, `store_card_used`, `store_card_basic_used`, `store_card_expand_used`, `order_promotion_num`, `item_promotion_num`, `buyer_remark`, `seller_remark`, trade_business_type from ( select `uni_order_id`, `data_from`, `partner`, `plat_code`, `order_id`, `uni_shop_id`, `uni_id`, `guide_id`, `shop_id`, `plat_account`, `total_fee`, `item_discount_fee`, `trade_discount_fee`, `adjust_fee`, `post_fee`, `discount_rate`, `payment_no_postfee`, `payment`, `pay_time`, `product_num`, `order_status`, `is_refund`, `refund_fee`, `insert_time`, `created`, `endtime`, `modified`, `trade_type`, `receiver_name`, `receiver_country`, `receiver_state`, `receiver_city`, `receiver_district`, `receiver_town`, `receiver_address`, `receiver_mobile`, `trade_source`, `delivery_type`, `consign_time`, `orders_num`, `is_presale`, `presale_status`, `first_fee_paytime`, `last_fee_paytime`, `first_paid_fee`, `tenant`, `tidb_modified`, `step_paid_fee`, `seller_flag`, `is_used_store_card`, `store_card_used`, `store_card_basic_used`, `store_card_expand_used`, `order_promotion_num`, `item_promotion_num`, `buyer_remark`, `seller_remark`, trade_business_type,row_number() over(partition by uni_order_id order by modified desc,dt desc) as rank from spark_catalog.dw_source.s_std_trade_tidb where dt>='2023-04-20' ) small where rank=1) s ON t.uni_order_id = s.uni_order_id and t.tenant = s.tenant and t.partner = s.partner  WHEN MATCHED AND s.modified>=t.modified then UPDATE SET t.uni_order_id = s.uni_order_id, t.data_from = s.data_from, t.partner = s.partner, t.plat_code = s.plat_code, t.order_id = s.order_id, t.uni_shop_id = s.uni_shop_id, t.uni_id = s.uni_id, t.guide_id = s.guide_id, t.shop_id = s.shop_id, t.plat_account = s.plat_account, t.total_fee = s.total_fee, t.item_discount_fee = s.item_discount_fee, t.trade_discount_fee = s.trade_discount_fee, t.adjust_fee = s.adjust_fee, t.post_fee = s.post_fee, t.discount_rate = s.discount_rate, t.payment_no_postfee = s.payment_no_postfee, t.payment = s.payment, t.pay_time = s.pay_time, t.product_num = s.product_num, t.order_status = s.order_status, t.is_refund = s.is_refund, t.refund_fee = s.refund_fee, t.insert_time = s.insert_time, t.created = s.created, t.endtime = s.endtime, t.modified = s.modified, t.trade_type = s.trade_type, t.receiver_name = s.receiver_name, t.receiver_country = s.receiver_country, t.receiver_state = s.receiver_state, t.receiver_city = s.receiver_city, t.receiver_district = s.receiver_district, t.receiver_town = s.receiver_town, t.receiver_address = s.receiver_address, t.receiver_mobile = s.receiver_mobile, t.trade_source = s.trade_source, t.delivery_type = s.delivery_type, t.consign_time = s.consign_time, t.orders_num = s.orders_num, t.is_presale = s.is_presale, t.presale_status = s.presale_status, t.first_fee_paytime = s.first_fee_paytime, t.last_fee_paytime = s.last_fee_paytime, t.first_paid_fee = s.first_paid_fee, t.tenant = s.tenant, t.tidb_modified = s.tidb_modified, t.step_paid_fee = s.step_paid_fee, t.seller_flag = s.seller_flag, t.is_used_store_card = s.is_used_store_card, t.store_card_used = s.store_card_used, t.store_card_basic_used = s.store_card_basic_used, t.store_card_expand_used = s.store_card_expand_used, t.order_promotion_num = s.order_promotion_num, t.item_promotion_num = s.item_promotion_num, t.buyer_remark = s.buyer_remark, t.seller_remark = s.seller_remark, t.trade_business_type = s.trade_business_type  WHEN NOT MATCHED THEN INSERT * ;
--9 minutes to execute



CREATE TABLE IF NOT EXISTS local.test.b_std_trade_4_iceberg_orc_zstd_mor ( `uni_order_id` string, `data_from` bigint, `partner` string, `plat_code` string, `order_id` string, `uni_shop_id` string, `uni_id` string, `guide_id` string, `shop_id` string, `plat_account` string, `total_fee` double, `item_discount_fee` double, `trade_discount_fee` double, `adjust_fee` double, `post_fee` double, `discount_rate` double, `payment_no_postfee` double, `payment` double, `pay_time` string, `product_num` bigint, `order_status` string, `is_refund` string, `refund_fee` double, `insert_time` string, `created` string, `endtime` string, `modified` string, `trade_type` string, `receiver_name` string, `receiver_country` string, `receiver_state` string, `receiver_city` string, `receiver_district` string, `receiver_town` string, `receiver_address` string, `receiver_mobile` string, `trade_source` string, `delivery_type` string, `consign_time` string, `orders_num` bigint, `is_presale` bigint, `presale_status` string, `first_fee_paytime` string, `last_fee_paytime` string, `first_paid_fee` double, `tenant` string, `tidb_modified` string, `step_paid_fee` double, `seller_flag` string, `is_used_store_card` BIGINT, `store_card_used` DOUBLE, `store_card_basic_used` DOUBLE, `store_card_expand_used` DOUBLE, `order_promotion_num` BIGINT, `item_promotion_num` BIGINT, `buyer_remark` string, `seller_remark` string, trade_business_type string )TBLPROPERTIES ('read.orc.vectorization.enabled'='true','write.format.default'='orc','write.orc.bloom.filter.columns'='tenant,shop_id,plat_code,order_status,payment,pay_time,created','kyuubi.zorder.enabled'='true','kyuubi.zorder.cols'='tenant,shop_id,plat_code,order_status,payment,pay_time,created','write.orc.compression-codec'='zstd','write.delete.mode'='merge-on-read','write.update.mode'='merge-on-read','write.merge.mode'='merge-on-read') STORED AS iceberg;

merge into local.test.b_std_trade_4_iceberg_orc_zstd_mor t using( select `uni_order_id`, `data_from`, `partner`, `plat_code`, `order_id`, `uni_shop_id`, `uni_id`, `guide_id`, `shop_id`, `plat_account`, `total_fee`, `item_discount_fee`, `trade_discount_fee`, `adjust_fee`, `post_fee`, `discount_rate`, `payment_no_postfee`, `payment`, `pay_time`, `product_num`, `order_status`, `is_refund`, `refund_fee`, `insert_time`, `created`, `endtime`, `modified`, `trade_type`, `receiver_name`, `receiver_country`, `receiver_state`, `receiver_city`, `receiver_district`, `receiver_town`, `receiver_address`, `receiver_mobile`, `trade_source`, `delivery_type`, `consign_time`, `orders_num`, `is_presale`, `presale_status`, `first_fee_paytime`, `last_fee_paytime`, `first_paid_fee`, `tenant`, `tidb_modified`, `step_paid_fee`, `seller_flag`, `is_used_store_card`, `store_card_used`, `store_card_basic_used`, `store_card_expand_used`, `order_promotion_num`, `item_promotion_num`, `buyer_remark`, `seller_remark`, trade_business_type from ( select `uni_order_id`, `data_from`, `partner`, `plat_code`, `order_id`, `uni_shop_id`, `uni_id`, `guide_id`, `shop_id`, `plat_account`, `total_fee`, `item_discount_fee`, `trade_discount_fee`, `adjust_fee`, `post_fee`, `discount_rate`, `payment_no_postfee`, `payment`, `pay_time`, `product_num`, `order_status`, `is_refund`, `refund_fee`, `insert_time`, `created`, `endtime`, `modified`, `trade_type`, `receiver_name`, `receiver_country`, `receiver_state`, `receiver_city`, `receiver_district`, `receiver_town`, `receiver_address`, `receiver_mobile`, `trade_source`, `delivery_type`, `consign_time`, `orders_num`, `is_presale`, `presale_status`, `first_fee_paytime`, `last_fee_paytime`, `first_paid_fee`, `tenant`, `tidb_modified`, `step_paid_fee`, `seller_flag`, `is_used_store_card`, `store_card_used`, `store_card_basic_used`, `store_card_expand_used`, `order_promotion_num`, `item_promotion_num`, `buyer_remark`, `seller_remark`, trade_business_type,row_number() over(partition by uni_order_id order by modified desc,dt desc) as rank from spark_catalog.dw_source.s_std_trade_tidb where dt>='2023-04-20' ) small where rank=1) s ON t.uni_order_id = s.uni_order_id and t.tenant = s.tenant and t.partner = s.partner  WHEN MATCHED AND s.modified>=t.modified then UPDATE SET t.uni_order_id = s.uni_order_id, t.data_from = s.data_from, t.partner = s.partner, t.plat_code = s.plat_code, t.order_id = s.order_id, t.uni_shop_id = s.uni_shop_id, t.uni_id = s.uni_id, t.guide_id = s.guide_id, t.shop_id = s.shop_id, t.plat_account = s.plat_account, t.total_fee = s.total_fee, t.item_discount_fee = s.item_discount_fee, t.trade_discount_fee = s.trade_discount_fee, t.adjust_fee = s.adjust_fee, t.post_fee = s.post_fee, t.discount_rate = s.discount_rate, t.payment_no_postfee = s.payment_no_postfee, t.payment = s.payment, t.pay_time = s.pay_time, t.product_num = s.product_num, t.order_status = s.order_status, t.is_refund = s.is_refund, t.refund_fee = s.refund_fee, t.insert_time = s.insert_time, t.created = s.created, t.endtime = s.endtime, t.modified = s.modified, t.trade_type = s.trade_type, t.receiver_name = s.receiver_name, t.receiver_country = s.receiver_country, t.receiver_state = s.receiver_state, t.receiver_city = s.receiver_city, t.receiver_district = s.receiver_district, t.receiver_town = s.receiver_town, t.receiver_address = s.receiver_address, t.receiver_mobile = s.receiver_mobile, t.trade_source = s.trade_source, t.delivery_type = s.delivery_type, t.consign_time = s.consign_time, t.orders_num = s.orders_num, t.is_presale = s.is_presale, t.presale_status = s.presale_status, t.first_fee_paytime = s.first_fee_paytime, t.last_fee_paytime = s.last_fee_paytime, t.first_paid_fee = s.first_paid_fee, t.tenant = s.tenant, t.tidb_modified = s.tidb_modified, t.step_paid_fee = s.step_paid_fee, t.seller_flag = s.seller_flag, t.is_used_store_card = s.is_used_store_card, t.store_card_used = s.store_card_used, t.store_card_basic_used = s.store_card_basic_used, t.store_card_expand_used = s.store_card_expand_used, t.order_promotion_num = s.order_promotion_num, t.item_promotion_num = s.item_promotion_num, t.buyer_remark = s.buyer_remark, t.seller_remark = s.seller_remark, t.trade_business_type = s.trade_business_type  WHEN NOT MATCHED THEN INSERT * ;
--30 minutes and still not executed

cow execute plan:

plan  == Physical Plan ==
ReplaceData IcebergWrite(table=local.test.b_std_trade_4_iceberg_orc_zstd, format=ORC)
+- AdaptiveSparkPlan isFinalPlan=false
   +- Project [uni_order_id#3712, data_from#3713L, partner#3714, plat_code#3715, order_id#3716, uni_shop_id#3717, uni_id#3718, guide_id#3719, shop_id#3720, plat_account#3721, total_fee#3722, item_discount_fee#3723, trade_discount_fee#3724, adjust_fee#3725, post_fee#3726, discount_rate#3727, payment_no_postfee#3728, payment#3729, pay_time#3730, product_num#3731L, order_status#3732, is_refund#3733, refund_fee#3734, insert_time#3735, ... 34 more fields]
      +- MergeRowsExec[uni_order_id#3712, data_from#3713L, partner#3714, plat_code#3715, order_id#3716, uni_shop_id#3717, uni_id#3718, guide_id#3719, shop_id#3720, plat_account#3721, total_fee#3722, item_discount_fee#3723, trade_discount_fee#3724, adjust_fee#3725, post_fee#3726, discount_rate#3727, payment_no_postfee#3728, payment#3729, pay_time#3730, product_num#3731L, order_status#3732, is_refund#3733, refund_fee#3734, insert_time#3735, ... 35 more fields]
         +- Sort [__row_id#3710L ASC NULLS FIRST], false, 0
            +- SortMergeJoin [uni_order_id#3586, tenant#3631, partner#3588], [uni_order_id#3771, tenant#3818, partner#3773], FullOuter
               :- Sort [uni_order_id#3586 ASC NULLS FIRST, tenant#3631 ASC NULLS FIRST, partner#3588 ASC NULLS FIRST], false, 0
               :  +- Exchange hashpartitioning(uni_order_id#3586, tenant#3631, partner#3588, 200), ENSURE_REQUIREMENTS, [plan_id=1636]
               :     +- Project [uni_order_id#3586, data_from#3587L, partner#3588, plat_code#3589, order_id#3590, uni_shop_id#3591, uni_id#3592, guide_id#3593, shop_id#3594, plat_account#3595, total_fee#3596, item_discount_fee#3597, trade_discount_fee#3598, adjust_fee#3599, post_fee#3600, discount_rate#3601, payment_no_postfee#3602, payment#3603, pay_time#3604, product_num#3605L, order_status#3606, is_refund#3607, refund_fee#3608, insert_time#3609, ... 37 more fields]
               :        +- BatchScan[uni_order_id#3586, data_from#3587L, partner#3588, plat_code#3589, order_id#3590, uni_shop_id#3591, uni_id#3592, guide_id#3593, shop_id#3594, plat_account#3595, total_fee#3596, item_discount_fee#3597, trade_discount_fee#3598, adjust_fee#3599, post_fee#3600, discount_rate#3601, payment_no_postfee#3602, payment#3603, pay_time#3604, product_num#3605L, order_status#3606, is_refund#3607, refund_fee#3608, insert_time#3609, ... 35 more fields] local.test.b_std_trade_4_iceberg_orc_zstd (branch=null) [filters=, groupedBy=] RuntimeFilters: [dynamicpruningexpression(_file#3706 IN dynamicpruning#4007)]
               :              +- SubqueryAdaptiveBroadcast dynamicpruning#4007, 0, false, Project [_file#4006], [_file#4006]
               :                 +- AdaptiveSparkPlan isFinalPlan=false
               :                    +- Project [_file#4006]
               :                       +- SortMergeJoin [uni_order_id#3948, tenant#3993, partner#3950], [uni_order_id#3645, tenant#3692, partner#3647], LeftSemi
               :                          :- Sort [uni_order_id#3948 ASC NULLS FIRST, tenant#3993 ASC NULLS FIRST, partner#3950 ASC NULLS FIRST], false, 0
               :                          :  +- Exchange hashpartitioning(uni_order_id#3948, tenant#3993, partner#3950, 200), ENSURE_REQUIREMENTS, [plan_id=1616]
               :                          :     +- Filter ((isnotnull(uni_order_id#3948) AND isnotnull(tenant#3993)) AND isnotnull(partner#3950))
               :                          :        +- BatchScan[uni_order_id#3948, partner#3950, tenant#3993, _file#4006] local.test.b_std_trade_4_iceberg_orc_zstd (branch=null) [filters=uni_order_id IS NOT NULL, tenant IS NOT NULL, partner IS NOT NULL, groupedBy=] RuntimeFilters: []
               :                          +- Sort [uni_order_id#3645 ASC NULLS FIRST, tenant#3692 ASC NULLS FIRST, partner#3647 ASC NULLS FIRST], false, 0
               :                             +- Exchange hashpartitioning(uni_order_id#3645, tenant#3692, partner#3647, 200), ENSURE_REQUIREMENTS, [plan_id=1617]
               :                                +- Project [uni_order_id#3645, partner#3647, tenant#3692]
               :                                   +- Filter (((rank#3579 = 1) AND isnotnull(tenant#3692)) AND isnotnull(partner#3647))
               :                                      +- Window [row_number() windowspecdefinition(uni_order_id#3645, modified#3671 DESC NULLS LAST, dt#3703 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#3579], [uni_order_id#3645], [modified#3671 DESC NULLS LAST, dt#3703 DESC NULLS LAST]
               :                                         +- Sort [uni_order_id#3645 ASC NULLS FIRST, modified#3671 DESC NULLS LAST, dt#3703 DESC NULLS LAST], false, 0
               :                                            +- Exchange hashpartitioning(uni_order_id#3645, 200), ENSURE_REQUIREMENTS, [plan_id=1609]
               :                                               +- Filter isnotnull(uni_order_id#3645)
               :                                                  +- Scan hive dw_source.s_std_trade_tidb [uni_order_id#3645, partner#3647, modified#3671, tenant#3692, dt#3703], HiveTableRelation [`dw_source`.`s_std_trade_tidb`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [uni_order_id#3645, data_from#3646L, partner#3647, plat_code#3648, order_id#3649, uni_shop_id#365..., Partition Cols: [dt#3703], Pruned Partitions: [(dt=2023-04-20), (dt=2023-04-21), (dt=2023-04-22), (dt=2023-04-23), (dt=2023-04-24), (dt=2023-04...], [isnotnull(dt#3703), (dt#3703 >= 2023-04-20)]
               +- Sort [uni_order_id#3771 ASC NULLS FIRST, tenant#3818 ASC NULLS FIRST, partner#3773 ASC NULLS FIRST], false, 0
                  +- Exchange hashpartitioning(uni_order_id#3771, tenant#3818, partner#3773, 200), ENSURE_REQUIREMENTS, [plan_id=1637]
                     +- Project [uni_order_id#3771, data_from#3772L, partner#3773, plat_code#3774, order_id#3775, uni_shop_id#3776, uni_id#3777, guide_id#3778, shop_id#3779, plat_account#3780, total_fee#3781, item_discount_fee#3782, trade_discount_fee#3783, adjust_fee#3784, post_fee#3785, discount_rate#3786, payment_no_postfee#3787, payment#3788, pay_time#3789, product_num#3790L, order_status#3791, is_refund#3792, refund_fee#3793, insert_time#3794, ... 35 more fields]
                        +- Filter (rank#3579 = 1)
                           +- Window [row_number() windowspecdefinition(uni_order_id#3771, modified#3797 DESC NULLS LAST, dt#3829 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#3579], [uni_order_id#3771], [modified#3797 DESC NULLS LAST, dt#3829 DESC NULLS LAST]
                              +- Sort [uni_order_id#3771 ASC NULLS FIRST, modified#3797 DESC NULLS LAST, dt#3829 DESC NULLS LAST], false, 0
                                 +- Exchange hashpartitioning(uni_order_id#3771, 200), ENSURE_REQUIREMENTS, [plan_id=1629]
                                    +- Scan hive dw_source.s_std_trade_tidb [uni_order_id#3771, data_from#3772L, partner#3773, plat_code#3774, order_id#3775, uni_shop_id#3776, uni_id#3777, guide_id#3778, shop_id#3779, plat_account#3780, total_fee#3781, item_discount_fee#3782, trade_discount_fee#3783, adjust_fee#3784, post_fee#3785, discount_rate#3786, payment_no_postfee#3787, payment#3788, pay_time#3789, product_num#3790L, order_status#3791, is_refund#3792, refund_fee#3793, insert_time#3794, ... 35 more fields], HiveTableRelation [`dw_source`.`s_std_trade_tidb`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [uni_order_id#3771, data_from#3772L, partner#3773, plat_code#3774, order_id#3775, uni_shop_id#377..., Partition Cols: [dt#3829], Pruned Partitions: [(dt=2023-04-20), (dt=2023-04-21), (dt=2023-04-22), (dt=2023-04-23), (dt=2023-04-24), (dt=2023-04...], [isnotnull(dt#3829), (dt#3829 >= 2023-04-20)]

mor execute plan:

plan  == Physical Plan ==
WriteDelta org.apache.iceberg.spark.source.SparkPositionDeltaWrite@4880567e
+- AdaptiveSparkPlan isFinalPlan=false
   +- Sort [_spec_id#4208 ASC NULLS FIRST, _partition#4209 ASC NULLS FIRST, _file#4206 ASC NULLS FIRST, _pos#4207L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(_spec_id#4208, _partition#4209, _file#4206, 200), REPARTITION_BY_NUM, [plan_id=1714]
         +- MergeRowsExec[__row_operation#4147, uni_order_id#4148, data_from#4149L, partner#4150, plat_code#4151, order_id#4152, uni_shop_id#4153, uni_id#4154, guide_id#4155, shop_id#4156, plat_account#4157, total_fee#4158, item_discount_fee#4159, trade_discount_fee#4160, adjust_fee#4161, post_fee#4162, discount_rate#4163, payment_no_postfee#4164, payment#4165, pay_time#4166, product_num#4167L, order_status#4168, is_refund#4169, refund_fee#4170, ... 39 more fields]
            +- Sort [_file#4141 ASC NULLS FIRST, _pos#4142L ASC NULLS FIRST], false, 0
               +- SortMergeJoin [uni_order_id#4021, tenant#4066, partner#4023], [uni_order_id#4210, tenant#4257, partner#4212], RightOuter
                  :- Sort [uni_order_id#4021 ASC NULLS FIRST, tenant#4066 ASC NULLS FIRST, partner#4023 ASC NULLS FIRST], false, 0
                  :  +- Exchange hashpartitioning(uni_order_id#4021, tenant#4066, partner#4023, 200), ENSURE_REQUIREMENTS, [plan_id=1706]
                  :     +- Filter ((isnotnull(uni_order_id#4021) AND isnotnull(tenant#4066)) AND isnotnull(partner#4023))
                  :        +- Project [uni_order_id#4021, data_from#4022L, partner#4023, plat_code#4024, order_id#4025, uni_shop_id#4026, uni_id#4027, guide_id#4028, shop_id#4029, plat_account#4030, total_fee#4031, item_discount_fee#4032, trade_discount_fee#4033, adjust_fee#4034, post_fee#4035, discount_rate#4036, payment_no_postfee#4037, payment#4038, pay_time#4039, product_num#4040L, order_status#4041, is_refund#4042, refund_fee#4043, insert_time#4044, ... 39 more fields]
                  :           +- BatchScan[uni_order_id#4021, data_from#4022L, partner#4023, plat_code#4024, order_id#4025, uni_shop_id#4026, uni_id#4027, guide_id#4028, shop_id#4029, plat_account#4030, total_fee#4031, item_discount_fee#4032, trade_discount_fee#4033, adjust_fee#4034, post_fee#4035, discount_rate#4036, payment_no_postfee#4037, payment#4038, pay_time#4039, product_num#4040L, order_status#4041, is_refund#4042, refund_fee#4043, insert_time#4044, ... 38 more fields] local.test.b_std_trade_4_iceberg_orc_zstd_mor (branch=null) [filters=, groupedBy=] RuntimeFilters: []
                  +- Sort [uni_order_id#4210 ASC NULLS FIRST, tenant#4257 ASC NULLS FIRST, partner#4212 ASC NULLS FIRST], false, 0
                     +- Exchange hashpartitioning(uni_order_id#4210, tenant#4257, partner#4212, 200), ENSURE_REQUIREMENTS, [plan_id=1707]
                        +- Project [uni_order_id#4210, data_from#4211L, partner#4212, plat_code#4213, order_id#4214, uni_shop_id#4215, uni_id#4216, guide_id#4217, shop_id#4218, plat_account#4219, total_fee#4220, item_discount_fee#4221, trade_discount_fee#4222, adjust_fee#4223, post_fee#4224, discount_rate#4225, payment_no_postfee#4226, payment#4227, pay_time#4228, product_num#4229L, order_status#4230, is_refund#4231, refund_fee#4232, insert_time#4233, ... 35 more fields]
                           +- Filter (rank#4014 = 1)
                              +- Window [row_number() windowspecdefinition(uni_order_id#4210, modified#4236 DESC NULLS LAST, dt#4268 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#4014], [uni_order_id#4210], [modified#4236 DESC NULLS LAST, dt#4268 DESC NULLS LAST]
                                 +- Sort [uni_order_id#4210 ASC NULLS FIRST, modified#4236 DESC NULLS LAST, dt#4268 DESC NULLS LAST], false, 0
                                    +- Exchange hashpartitioning(uni_order_id#4210, 200), ENSURE_REQUIREMENTS, [plan_id=1699]
                                       +- Scan hive dw_source.s_std_trade_tidb [uni_order_id#4210, data_from#4211L, partner#4212, plat_code#4213, order_id#4214, uni_shop_id#4215, uni_id#4216, guide_id#4217, shop_id#4218, plat_account#4219, total_fee#4220, item_discount_fee#4221, trade_discount_fee#4222, adjust_fee#4223, post_fee#4224, discount_rate#4225, payment_no_postfee#4226, payment#4227, pay_time#4228, product_num#4229L, order_status#4230, is_refund#4231, refund_fee#4232, insert_time#4233, ... 35 more fields], HiveTableRelation [`dw_source`.`s_std_trade_tidb`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [uni_order_id#4210, data_from#4211L, partner#4212, plat_code#4213, order_id#4214, uni_shop_id#421..., Partition Cols: [dt#4268], Pruned Partitions: [(dt=2023-04-20), (dt=2023-04-21), (dt=2023-04-22), (dt=2023-04-23), (dt=2023-04-24), (dt=2023-04...], [isnotnull(dt#4268), (dt#4268 >= 2023-04-20)]

This is a screenshot of the current execution status of the MOR table: image

BsoBird avatar Apr 25 '23 17:04 BsoBird

Can anyone help guide me?

BsoBird avatar Apr 25 '23 17:04 BsoBird

IMHO it should be due to RowLevelCommandDynamicPruning is not supported in MOR : https://github.com/apache/iceberg/blob/50ca63bde82547c42475591455c00a429c854d4b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelCommandDynamicPruning.scala#L66-L68

singhpk234 avatar Apr 25 '23 17:04 singhpk234

@singhpk234 Am I right in thinking that MOR tables are not currently available on a large scale? After all, MOR tables take too long to execute. Also do we have a solution or plan to fix this problem?

BsoBird avatar Apr 26 '23 05:04 BsoBird

There are a few pending changes I am yet to submit to OSS regarding MoR. Runtime filtering is one of the potential causes.

I'll take a closer look at this issue soon.

aokolnychyi avatar May 04 '23 17:05 aokolnychyi

@aokolnychyi Hi, I have added the execution plan of the SQL statement, please let me know if you need me to provide additional information.

BsoBird avatar May 05 '23 04:05 BsoBird

It looks like what folks are mentioning here, the noticeable difference in plan is

               :        +- BatchScan[uni_order_id#3586, data_from#3587L, partner#3588, plat_code#3589, order_id#3590, uni_shop_id#3591, uni_id#3592, guide_id#3593, shop_id#3594, plat_account#3595, total_fee#3596, item_discount_fee#3597, trade_discount_fee#3598, adjust_fee#3599, post_fee#3600, discount_rate#3601, payment_no_postfee#3602, payment#3603, pay_time#3604, product_num#3605L, order_status#3606, is_refund#3607, refund_fee#3608, insert_time#3609, ... 35 more fields] local.test.b_std_trade_4_iceberg_orc_zstd (branch=null) [filters=, groupedBy=] RuntimeFilters: [dynamicpruningexpression(_file#3706 IN dynamicpruning#4007)]
               :              +- SubqueryAdaptiveBroadcast dynamicpruning#4007, 0, false, Project [_file#4006], [_file#4006]

Looks like pushed to test.b_std_trade_4_iceberg_orc_zstd in COW only

szehon-ho avatar May 05 '23 18:05 szehon-ho

If you have time, do you have the Spark History UI of the runtimes of how much time it spends in each stage, in both versions? I see a running time for MOR but would be great to compare to COW as well.

szehon-ho avatar May 08 '23 23:05 szehon-ho

@szehon-ho hi. I uploaded the image.(COW running time)

When I ran the SQL this time, there were some third-party tasks in the cluster that were competing with me for resources, so my SQL execution took a little longer this time. I hope this factor does not affect your analysis too much.

image image

BsoBird avatar May 10 '23 14:05 BsoBird

Hi, thanks. Yea I am comparing the two stage trees, nothing immediately jumps out to me to say why MOR is 40 mins and the other is 17 mins.

Comparing the two joins,

  • MOR join has shuffle bytes: 207.6 GB and 50.3 GB.
  • COW has two joins (first for determining list of files, second is the actual join with filter on the file list)
    • 1st join has shuflfe bytes: 10.3G and 7.0 G (file list calcluation)
    • 2nd join has 156.9 G and 109.3 G (final join after file filter)

So just based on that, I dont see huge difference here.

Also I notice that the data for the two runs may be different, you have 785 million and 499 million rows for COW, and then 261 million and 498 million for MOR.

I am not a huge expert on Spark UI, but it was not too clear just from here how long each stage take, and which is the culprit. Will the stage part of the UI have this more clearly? Hope you dont have to re-run both jobs but can just get it from the History for these two runs.

szehon-ho avatar May 10 '23 22:05 szehon-ho

@szehon-ho Yes, the amount of data in the SOURCE table is not the same between the two executions, because some incremental data is coming in every day. I'll adjust it and re-execute it to make sure the data volume is equal on both sides. And I'll post the time taken for the stage

BsoBird avatar May 11 '23 01:05 BsoBird

@szehon-ho Hello. I have prepared two datasets with the same amount of data and conducted experiments in the same environment. Here are the results of the experiments I collected.(I uploaded it to slack) From the current point of view, there is a skew job in the processing of the MOR table. Incidentally, I merge data this time, 90 percent is new data, will not produce UPDATE. https://github.com/apache/iceberg/pull/7520 Is this PR fixing the problem? image

BsoBird avatar May 11 '23 15:05 BsoBird

Thanks. Downloaded your files from: link

Looks like there is terrible skew in the MOR join.

STAGE-18-DETAIL

Can you check the COW joins and see the details? Is there no comparable skew?

szehon-ho avatar May 12 '23 05:05 szehon-ho

@szehon-ho The file I provided you with contains the execution details of the COW table. Do you need any additional information from me?

BsoBird avatar May 12 '23 05:05 BsoBird

I think the COW join stages does not have the detail you have of MOR stages (DETAIL/ LONG-TIME files), if you can send the detail of those stages, that'll be nice.

But I'm thinknig one hypothesis I'll check tomorrow with @RussellSpitzer and @aokolnychyi , is perhaps there is skew in both COW and MOR. But in COW we skew only in the first join when calculating the file set, and its smaller as we select much less data in this calculation. The second COW join benefits from knowing the exact file set, so only read what is necessary.

szehon-ho avatar May 12 '23 05:05 szehon-ho

Actually I noticed I may be wrong, the screenshot I posted is the last stage (MergeInto), its not Join as I thought. So it may be related to @aokolnychyi fix : https://github.com/apache/iceberg/pull/7558 , and less to do with runtimeFiltering.

szehon-ho avatar May 12 '23 22:05 szehon-ho

Missed your earlier comment as well, https://github.com/apache/iceberg/pull/7520 seems it should help as well. Is it possible to re-test , now that these two are in? Iceberg 1.3 should be out soon as well with these two fixes by @aokolnychyi

szehon-ho avatar May 16 '23 22:05 szehon-ho

@szehon-ho Sounds good, I'll test it first after v1.3 is released

BsoBird avatar May 17 '23 04:05 BsoBird

@szehon-ho I re-tested ICEBERG 1.3.0 on SPARK 3.3. but the problem is still not solved. Is this problem solved in SPARK3.4?

BsoBird avatar May 31 '23 03:05 BsoBird

write.orc.bloom.filter.columns can take effect on spark 3.1? when i set write.orc.bloom.filter.columns=xx, wrote data by spark and query table by xx field, but i can not find any query improvement.

chenwyi2 avatar Jul 21 '23 02:07 chenwyi2

@chenwyi2 The bloom filter may not be as useful as it seems. When the underlying dataset is very large, the bloom filter has a problem with false positives. A proven approach is to sort the data. However, this introduces a significant IO burden.

BsoBird avatar Jul 21 '23 07:07 BsoBird

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.

github-actions[bot] avatar Aug 29 '24 00:08 github-actions[bot]

This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale'

github-actions[bot] avatar Sep 12 '24 00:09 github-actions[bot]

I'm having what looks like the same or similar problem. 1.6.1 seems a little bit faster than 1.0.0 or 1.4.x, but still incredibly slow — far too slow for MOR to be a usable mode in this situation.

wimlewis-amazon avatar Nov 12 '24 17:11 wimlewis-amazon