hudi icon indicating copy to clipboard operation
hudi copied to clipboard

[HUDI-7617] Fix issues for bulk insert user defined partitioner in StreamSync

Open vinishjail97 opened this issue 4 months ago • 1 comments

Change Logs

NOTE: This PR handles only AVRO code paths, there will be follow-up patch for RowWriter code paths as well.

There are two problems with BULK_INSERT and partitioners.

  1. Passing user defined partitioner using hoodie.bulkinsert.user.defined.partitioner.class is not honoured in the StreamSync code path and the data is written in a non-sort mode and can lead to OOM errors because of too many open writeHandles.
  2. There is another problem with RDDCustomColumnsSortPartitioner where data is globally sorted but too many files are written because data is actually not pre-pending the partition keys in the sort columns. The unit test fails with this error for existing code.
org.opentest4j.AssertionFailedError: 
Expected :654
Actual   :3
<Click to see difference> 

// Verify each partition has one base file because parallelism is 1.
assertEquals(baseFiles.size(), partitions.size());

https://github.com/onehouseinc/hudi-internal/blob/master/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java#L60

 @Override
  public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records,
                                                     int outputSparkPartitions) {
    final String[] sortColumns = this.sortColumnNames;
    final SerializableSchema schema = this.serializableSchema;
    final boolean consistentLogicalTimestampEnabled = this.consistentLogicalTimestampEnabled;
    return records.sortBy(
        record -> {
          Object[] columnValues = record.getColumnValues(schema.get(), sortColumns, consistentLogicalTimestampEnabled);
          return FlatLists.ofComparableArray(columnValues);
        },
        true, outputSparkPartitions);
  } 

But _hoodie_partition_path is returned as null here using record.getColumnValues, added the screenshots from debugger because these fields are actually added as part of HoodieAvroParquetWriter. https://github.com/onehouseinc/hudi-internal/blob/master/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetWriter.java#L64

  @Override
  public void writeAvroWithMetadata(HoodieKey key, IndexedRecord avroRecord) throws IOException {
    if (populateMetaFields) {
      prepRecordWithMetadata(key, avroRecord, instantTime,
          taskContextSupplier.getPartitionIdSupplier().get(), getWrittenRecordCount(), fileName);
      super.write(avroRecord);
      writeSupport.add(key.getRecordKey());
    } else {
      super.write(avroRecord);
    }
  }

  default void prepRecordWithMetadata(HoodieKey key, IndexedRecord avroRecord, String instantTime, Integer partitionId, long recordIndex, String fileName) {
    String seqId = HoodieRecord.generateSequenceId(instantTime, partitionId, recordIndex);
    HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord) avroRecord, key.getRecordKey(), key.getPartitionPath(), fileName);
    HoodieAvroUtils.addCommitMetadataToRecord((GenericRecord) avroRecord, instantTime, seqId);
  }

Attaching the screenshots below where _hoodie_partition_path column is null. image

image

Impact

No impact, fixing the bugs related to BULK_INSERT user defined partitioners to ensure it sorts the data correctly.

Risk level (write none, low medium or high below)

Medium.

Documentation Update

None.

Contributor's checklist

  • [x] Read through contributor's guide
  • [x] Change Logs and Impact were stated clearly
  • [x] Adequate tests were added if applicable
  • [x] CI passed

vinishjail97 avatar Apr 13 '24 20:04 vinishjail97