hudi icon indicating copy to clipboard operation
hudi copied to clipboard

[HUDI-8387] Optimized `KeyGenUtils::extractRecordKeysByFields`

Open geserdugarov opened this issue 1 year ago • 6 comments

Change Logs

KeyGenUtils::extractRecordKeysByFields is used in Flink stream processing with buckets. Optimization of this part is critical because it's called from BucketStreamWriteFunction::processElement, which means it's called for each record.

Before changes: 09 - string

After changes: 16 - optimized extract - 2

Also, fix for HUDI-5302 introduced complex behavior depending on combination and sequence of , and :. Actually, a sequence of , is not supported. For instance, key col1:val1,,,col2:val2 with hash fields col2 will be processed in the following way:

  • parsed key value pair: col1:val1, :null, :null, col2:val2,
  • after filtering by hash fields: :null, :null, col2:val2,
  • hash calculation using null, null, and val2.

This is wrong behavior, because col1 in this case has value val1,,, and we should skip it due to set hash fields. And use for hash val2 only.

This MR fix this behavior, and propose full support of , in complex key values, but drop support of : in composite key values. Both characters are supported if it's not complex key, but simple key from one field. Is key is complex if defined by presence of both , and :.

Impact

Memory allocation optimized when buckets are used.

Risk level (write none, low medium or high below)

Low

Documentation Update

Explicit note about not supported : character in key values if complex keys are used. Currently, the behavior is complex when , and : are used in key values, and dependent from combination and sequence of these characters.

Contributor's checklist

  • [x] Read through contributor's guide
  • [x] Change Logs and Impact were stated clearly
  • [x] Adequate tests were added if applicable
  • [ ] CI passed

geserdugarov avatar Oct 17 '24 12:10 geserdugarov

Before changes in this MR, bucketing using timestamps is wrong. To illustrate it, a test for bucket pruning could be used: https://github.com/apache/hudi/blob/e0b12fbae5d69ee2101806d78f04672c1e5deb6f/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java#L245-L256

To calculate hash for bucketing KeyGenUtils::extractRecordKeysByFields will be used: https://github.com/apache/hudi/blob/e0b12fbae5d69ee2101806d78f04672c1e5deb6f/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java#L135-L139 But in this function timestamp as string will be passed without column name, and also column name for hashing will be passed "f_timestamp". Substring of timestamp will be used as column name, and will be filtered due after comparison with "f_timestamp".

In a result, KeyGenUtils::extractRecordKeysByFields will pass empty list back for hash calculation for all timestamps, which leads to only 1 bucket for all data:

/tmp/junit7256928206840261812/tbl1$ tree -a
.
├── __HIVE_DEFAULT_PARTITION__
│   ├── 00000001-1223-4733-8604-ed99d64ce5d9_0-1-0_20241022095610822.parquet
│   ├── .00000001-1223-4733-8604-ed99d64ce5d9_0-1-0_20241022095610822.parquet.crc
│   ├── .hoodie_partition_metadata
│   └── ..hoodie_partition_metadata.crc

After changes in this MR, we have 4 buckets as expected due to default value of FlinkOptions::BUCKET_INDEX_NUM_BUCKETS:

/tmp/junit3115200942477430253/tbl1$ tree -a
.
├── __HIVE_DEFAULT_PARTITION__
│   ├── 00000000-d783-4d0a-a073-2fd404a6fe82_0-1-0_20241022110006717.parquet
│   ├── .00000000-d783-4d0a-a073-2fd404a6fe82_0-1-0_20241022110006717.parquet.crc
│   ├── 00000001-7f0a-4b07-a8d2-2791f1078ab3_0-1-0_20241022110006717.parquet
│   ├── .00000001-7f0a-4b07-a8d2-2791f1078ab3_0-1-0_20241022110006717.parquet.crc
│   ├── 00000002-3ac9-46dd-8fde-1f7b28510426_0-1-0_20241022110006717.parquet
│   ├── .00000002-3ac9-46dd-8fde-1f7b28510426_0-1-0_20241022110006717.parquet.crc
│   ├── 00000003-e47e-42b6-8db3-2b1167a8826c_0-1-0_20241022110006717.parquet
│   ├── .00000003-e47e-42b6-8db3-2b1167a8826c_0-1-0_20241022110006717.parquet.crc
│   ├── .hoodie_partition_metadata
│   └── ..hoodie_partition_metadata.crc

geserdugarov avatar Oct 22 '24 03:10 geserdugarov

Having ':' in key value is not prohibited and it may occur easily, so we should process it properly instead of throwing exceptions.

Thanks for your review! I would add support of :, and push corresponding commit.

geserdugarov avatar Oct 22 '24 07:10 geserdugarov

Updated unit test cases, when only one column is used for record key. In this case, we expect value, column_name:value is not possible due to: https://github.com/apache/hudi/blob/9b3f85e1b4d037a6e10c81e4b8d5f3e8a4a01ef6/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java#L249-L253

geserdugarov avatar Oct 23 '24 04:10 geserdugarov

CI report:

  • e07473bd3036a35e399b5dcb1c35baaa17b3e366 UNKNOWN
  • 67fa57fa6e315c8cdbcc0c2acf08b318c0560c1a UNKNOWN
  • 887765a06b7db5b967755bd3e0f088d3a3d4c859 UNKNOWN
  • c039396171018d405f3a015604f3482f73907fe4 Azure: SUCCESS
Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

hudi-bot avatar Oct 24 '24 13:10 hudi-bot