hudi
hudi copied to clipboard
[HUDI-7508] Avoid collecting records in HoodieStreamerUtils.createHoodieRecords and JsonKafkaSource mapPartitions
Change Logs
This block of code is problematic and can lead to OOM when we are we converting the iterator into a list and then returning the iterator back. This just holds up memory in the heap when the executor is running this block of code. https://github.com/apache/hudi/blob/master/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java#L86
records = avroRDD.mapPartitions(
(FlatMapFunction<Iterator<GenericRecord>, Either<HoodieRecord,String>>) genericRecordIterator -> {
if (autoGenerateRecordKeys) {
props.setProperty(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG, String.valueOf(TaskContext.getPartitionId()));
props.setProperty(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG, instantTime);
}
BuiltinKeyGenerator builtinKeyGenerator = (BuiltinKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
List<Either<HoodieRecord,String>> avroRecords = new ArrayList<>();
while (genericRecordIterator.hasNext()) {
GenericRecord genRec = genericRecordIterator.next();
try {
HoodieKey hoodieKey = new HoodieKey(builtinKeyGenerator.getRecordKey(genRec), builtinKeyGenerator.getPartitionPath(genRec));
GenericRecord gr = isDropPartitionColumns(props) ? HoodieAvroUtils.removeFields(genRec, partitionColumns) : genRec;
HoodieRecordPayload payload = shouldCombine ? DataSourceUtils.createPayload(cfg.payloadClassName, gr,
(Comparable) HoodieAvroUtils.getNestedFieldVal(gr, cfg.sourceOrderingField, false, props.getBoolean(
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()))))
: DataSourceUtils.createPayload(cfg.payloadClassName, gr);
avroRecords.add(Either.left(new HoodieAvroRecord<>(hoodieKey, payload)));
} catch (Exception e) {
if (!shouldErrorTable) {
throw e;
}
avroRecords.add(generateErrorRecord(genRec));
}
}
return avroRecords.iterator();
});
Impact
No change in public API, this is making mapPartitions part of the code memory efficient instead of collecting the results in the executor.
Risk level (write none, low medium or high below)
Medium
Documentation Update
None.
Contributor's checklist
- [x] Read through contributor's guide
- [x] Change Logs and Impact were stated clearly
- [x] Adequate tests were added if applicable
- [ ] CI passed
hey @vinishjail97 : can you attach the memory profileing you did before and after this patch. and rebase w/ master. we are good to go
CI report:
- acbabdc64da321e77aaabd03bcd9d5f3c322c0ec Azure: SUCCESS
Bot commands
@hudi-bot supports the following commands:-
@hudi-bot run azure
re-run the last Azure build
hey @vinishjail97 : can you attach the memory profileing you did before and after this patch. and rebase w/ master. we are good to go
15th March: Basic OOM Test (Consume 2M events, each payload is approximately 1KB with 2 maxExecutors and 1GB memory) and dynamic allocation ratio was 0.002 so essentially only 1 executor will be used as tasks spawned are not enough.
driver:
coreLimit: 2000m
coreRequest: 1800m
cores: 2
labels:
orgId: 0c043996-9e42-4904-95b9-f98918ebeda4
version: 3.1.1
memory: 2g
serviceAccount: staging-spark
dynamicAllocation:
enabled: true
initialExecutors: 0
maxExecutors: 2
minExecutors: 0
executor:
coreLimit: 1000m
coreRequest: 750m
cores: 1
labels:
orgId: 0c043996-9e42-4904-95b9-f98918ebeda4
version: 3.1.1
memory: 1g
Without the fix, the stage was failing with executor OOM after 20min.
After using this fix, the same stage completed in 17min with one executor.