hudi
hudi copied to clipboard
[HUDI-7617] Fix issues for bulk insert user defined partitioner in StreamSync
Change Logs
NOTE: This PR handles only AVRO code paths, there will be follow-up patch for RowWriter code paths as well.
There are two problems with BULK_INSERT and partitioners.
- Passing user defined partitioner using
hoodie.bulkinsert.user.defined.partitioner.class
is not honoured in the StreamSync code path and the data is written in a non-sort mode and can lead to OOM errors because of too many open writeHandles. - There is another problem with
RDDCustomColumnsSortPartitioner
where data is globally sorted but too many files are written because data is actually not pre-pending the partition keys in the sort columns. The unit test fails with this error for existing code.
org.opentest4j.AssertionFailedError:
Expected :654
Actual :3
<Click to see difference>
// Verify each partition has one base file because parallelism is 1.
assertEquals(baseFiles.size(), partitions.size());
https://github.com/onehouseinc/hudi-internal/blob/master/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java#L60
@Override
public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records,
int outputSparkPartitions) {
final String[] sortColumns = this.sortColumnNames;
final SerializableSchema schema = this.serializableSchema;
final boolean consistentLogicalTimestampEnabled = this.consistentLogicalTimestampEnabled;
return records.sortBy(
record -> {
Object[] columnValues = record.getColumnValues(schema.get(), sortColumns, consistentLogicalTimestampEnabled);
return FlatLists.ofComparableArray(columnValues);
},
true, outputSparkPartitions);
}
But _hoodie_partition_path
is returned as null here using record.getColumnValues, added the screenshots from debugger because these fields are actually added as part of HoodieAvroParquetWriter
.
https://github.com/onehouseinc/hudi-internal/blob/master/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetWriter.java#L64
@Override
public void writeAvroWithMetadata(HoodieKey key, IndexedRecord avroRecord) throws IOException {
if (populateMetaFields) {
prepRecordWithMetadata(key, avroRecord, instantTime,
taskContextSupplier.getPartitionIdSupplier().get(), getWrittenRecordCount(), fileName);
super.write(avroRecord);
writeSupport.add(key.getRecordKey());
} else {
super.write(avroRecord);
}
}
default void prepRecordWithMetadata(HoodieKey key, IndexedRecord avroRecord, String instantTime, Integer partitionId, long recordIndex, String fileName) {
String seqId = HoodieRecord.generateSequenceId(instantTime, partitionId, recordIndex);
HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord) avroRecord, key.getRecordKey(), key.getPartitionPath(), fileName);
HoodieAvroUtils.addCommitMetadataToRecord((GenericRecord) avroRecord, instantTime, seqId);
}
Attaching the screenshots below where _hoodie_partition_path column is null.
Impact
No impact, fixing the bugs related to BULK_INSERT user defined partitioners to ensure it sorts the data correctly.
Risk level (write none, low medium or high below)
Medium.
Documentation Update
None.
Contributor's checklist
- [x] Read through contributor's guide
- [x] Change Logs and Impact were stated clearly
- [x] Adequate tests were added if applicable
- [x] CI passed
Azure CI is green.
CI report:
- 33710549e6c4071bd327ef528e17302e42bf829c Azure: SUCCESS
Bot commands
@hudi-bot supports the following commands:-
@hudi-bot run azure
re-run the last Azure build