hudi icon indicating copy to clipboard operation
hudi copied to clipboard

[HUDI-7508] Avoid collecting records in HoodieStreamerUtils.createHoodieRecords and JsonKafkaSource mapPartitions

Open vinishjail97 opened this issue 11 months ago • 1 comments

Change Logs

This block of code is problematic and can lead to OOM when we are we converting the iterator into a list and then returning the iterator back. This just holds up memory in the heap when the executor is running this block of code. https://github.com/apache/hudi/blob/master/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java#L86

        records = avroRDD.mapPartitions(
            (FlatMapFunction<Iterator<GenericRecord>, Either<HoodieRecord,String>>) genericRecordIterator -> {
              if (autoGenerateRecordKeys) {
                props.setProperty(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG, String.valueOf(TaskContext.getPartitionId()));
                props.setProperty(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG, instantTime);
              }
              BuiltinKeyGenerator builtinKeyGenerator = (BuiltinKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
              List<Either<HoodieRecord,String>> avroRecords = new ArrayList<>();
              while (genericRecordIterator.hasNext()) {
                GenericRecord genRec = genericRecordIterator.next();
                try {
                  HoodieKey hoodieKey = new HoodieKey(builtinKeyGenerator.getRecordKey(genRec), builtinKeyGenerator.getPartitionPath(genRec));
                  GenericRecord gr = isDropPartitionColumns(props) ? HoodieAvroUtils.removeFields(genRec, partitionColumns) : genRec;
                  HoodieRecordPayload payload = shouldCombine ? DataSourceUtils.createPayload(cfg.payloadClassName, gr,
                      (Comparable) HoodieAvroUtils.getNestedFieldVal(gr, cfg.sourceOrderingField, false, props.getBoolean(
                          KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
                          Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()))))
                      : DataSourceUtils.createPayload(cfg.payloadClassName, gr);
                  avroRecords.add(Either.left(new HoodieAvroRecord<>(hoodieKey, payload)));
                } catch (Exception e) {
                  if (!shouldErrorTable) {
                    throw e;
                  }
                  avroRecords.add(generateErrorRecord(genRec));
                }
              }
              return avroRecords.iterator();
            });

Impact

No change in public API, this is making mapPartitions part of the code memory efficient instead of collecting the results in the executor.

Risk level (write none, low medium or high below)

Medium

Documentation Update

None.

Contributor's checklist

  • [x] Read through contributor's guide
  • [x] Change Logs and Impact were stated clearly
  • [x] Adequate tests were added if applicable
  • [ ] CI passed

vinishjail97 avatar Mar 15 '24 19:03 vinishjail97

hey @vinishjail97 : can you attach the memory profileing you did before and after this patch. and rebase w/ master. we are good to go

nsivabalan avatar May 09 '24 22:05 nsivabalan

CI report:

  • acbabdc64da321e77aaabd03bcd9d5f3c322c0ec Azure: SUCCESS
Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

hudi-bot avatar May 11 '24 02:05 hudi-bot

hey @vinishjail97 : can you attach the memory profileing you did before and after this patch. and rebase w/ master. we are good to go

15th March: Basic OOM Test (Consume 2M events, each payload is approximately 1KB with 2 maxExecutors and 1GB memory) and dynamic allocation ratio was 0.002 so essentially only 1 executor will be used as tasks spawned are not enough.

driver: coreLimit: 2000m coreRequest: 1800m cores: 2 labels: orgId: 0c043996-9e42-4904-95b9-f98918ebeda4 version: 3.1.1 memory: 2g serviceAccount: staging-spark dynamicAllocation: enabled: true initialExecutors: 0 maxExecutors: 2 minExecutors: 0 executor: coreLimit: 1000m coreRequest: 750m cores: 1 labels: orgId: 0c043996-9e42-4904-95b9-f98918ebeda4 version: 3.1.1 memory: 1g Without the fix, the stage was failing with executor OOM after 20min. image

After using this fix, the same stage completed in 17min with one executor. image

vinishjail97 avatar May 11 '24 16:05 vinishjail97