hudi icon indicating copy to clipboard operation
hudi copied to clipboard

[HUDI-7508] Avoid collecting records in HoodieStreamerUtils.createHoodieRecords and JsonKafkaSource mapPartitions

Open vinishjail97 opened this issue 5 months ago • 1 comments

Change Logs

This block of code is problematic and can lead to OOM when we are we converting the iterator into a list and then returning the iterator back. This just holds up memory in the heap when the executor is running this block of code. https://github.com/apache/hudi/blob/master/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java#L86

        records = avroRDD.mapPartitions(
            (FlatMapFunction<Iterator<GenericRecord>, Either<HoodieRecord,String>>) genericRecordIterator -> {
              if (autoGenerateRecordKeys) {
                props.setProperty(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG, String.valueOf(TaskContext.getPartitionId()));
                props.setProperty(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG, instantTime);
              }
              BuiltinKeyGenerator builtinKeyGenerator = (BuiltinKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
              List<Either<HoodieRecord,String>> avroRecords = new ArrayList<>();
              while (genericRecordIterator.hasNext()) {
                GenericRecord genRec = genericRecordIterator.next();
                try {
                  HoodieKey hoodieKey = new HoodieKey(builtinKeyGenerator.getRecordKey(genRec), builtinKeyGenerator.getPartitionPath(genRec));
                  GenericRecord gr = isDropPartitionColumns(props) ? HoodieAvroUtils.removeFields(genRec, partitionColumns) : genRec;
                  HoodieRecordPayload payload = shouldCombine ? DataSourceUtils.createPayload(cfg.payloadClassName, gr,
                      (Comparable) HoodieAvroUtils.getNestedFieldVal(gr, cfg.sourceOrderingField, false, props.getBoolean(
                          KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
                          Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()))))
                      : DataSourceUtils.createPayload(cfg.payloadClassName, gr);
                  avroRecords.add(Either.left(new HoodieAvroRecord<>(hoodieKey, payload)));
                } catch (Exception e) {
                  if (!shouldErrorTable) {
                    throw e;
                  }
                  avroRecords.add(generateErrorRecord(genRec));
                }
              }
              return avroRecords.iterator();
            });

Impact

No change in public API, this is making mapPartitions part of the code memory efficient instead of collecting the results in the executor.

Risk level (write none, low medium or high below)

Medium

Documentation Update

None.

Contributor's checklist

  • [x] Read through contributor's guide
  • [x] Change Logs and Impact were stated clearly
  • [x] Adequate tests were added if applicable
  • [ ] CI passed

vinishjail97 avatar Mar 15 '24 19:03 vinishjail97