hudi icon indicating copy to clipboard operation
hudi copied to clipboard

[SUPPORT] Running `--continuous` mode with HoodieMultiTableDeltaStreamer seems to only ingest first table

Open ddai-shippo opened this issue 2 years ago • 4 comments

Describe the problem you faced

New to Hudi so very well could be some configuration issue on my side. I'm trying to set up a continuous multi-table ingestion job. I can successfully ingest multiple tables when running without --continuous flag so properties seem set up correctly. When I add the --continuous flag, the job seems to be ingesting data for the first table in continuous mode and never proceeds to ingest data from other tables.

To Reproduce

Steps to reproduce the behavior:

  1. Set up multitable ingestion with --continuous flag
  2. Run hudi job
  3. Observe first table processing new incoming data, but other tables not progressing

Expected behavior

Multitable ingestion with --continuous flag on should process data continually cycling through all tables

Environment Description Using AWS EMR Release 6.6

  • Hudi version : 0.10.1-amzn-0

  • Spark version : 3.2.0

  • Hive version : 3.1.2

  • Hadoop version : 3.2.1

  • Storage (HDFS/S3/GCS..) : S3

  • Running on Docker? (yes/no) : No

Additional context

spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieMultiTableDeltaStreamer --packages org.apache.hudi:hudi-utilities-bundle_2.12:0.10.1,org.apache.spark:spark-avro_2.12:2.4.5 --master yarn --deploy-mode cluster --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false /usr/lib/hudi/hudi-utilities-bundle_2.12-0.10.1-amzn-0.jar --table-type COPY_ON_WRITE --source-class org.apache.hudi.utilities.sources.ParquetDFSSource --payload-class org.apache.hudi.payload.AWSDmsAvroPayload --source-ordering-field ts --base-path-prefix s3://{hudi_root} --target-table dummy_table --transformer-class org.apache.hudi.utilities.transform.AWSDmsTransformer --props s3://{properties_file} --config-folder s3://{config_directory} --continuous --min-sync-interval-seconds 60 --source-limit 2147483648

Stacktrace

ddai-shippo avatar Aug 05 '22 18:08 ddai-shippo

@ddai-shippo This looks like a legit bug where the continuous running of the first table blocks the rest in the HoodieMultiTableDeltaStreamer. I'll work on a fix.

yihua avatar Aug 06 '22 01:08 yihua

@yihua @ddai-shippo the only way the multitable deltastreamer job moves to the second table is if there is any error on the first table. Or else, the multitable deltastreamer keeps running only on the first table.

brskiran1 avatar Aug 06 '22 03:08 brskiran1

@yihua any ETA for the fix?

brskiran1 avatar Aug 06 '22 18:08 brskiran1

@brskiran1 I'll try to put up a simple fix this week so that at least the HoodieMultiTableDeltaStreamer can iterate through all tables in the continuous mode.

yihua avatar Aug 09 '22 17:08 yihua

@yihua : if you have a tracking jira, can you post it here.

nsivabalan avatar Aug 16 '22 04:08 nsivabalan

https://issues.apache.org/jira/browse/HUDI-1881 Lets track the updates from jira. thanks for reporting.

nsivabalan avatar Aug 16 '22 04:08 nsivabalan

I added a workaround for this issue in my local fork of Hudi. Small tweaks to HoodieAsyncService.java and HoodieMultiTableDeltaStreamer.java were required and now it's working as expected. The executor shutdown timeout being 24 hours was causing the job to hang, so I changed it to 10 seconds with no negative consequences. I also enabled the config --post-write-termination-strategy-class org.apache.hudi.utilities.deltastreamer.NoNewDataTerminationStrategy in MultiTableDeltaStreamer, so it can move on to the next table in MultiTableDeltaStreamer after N number of retries with no new data (max.rounds.without.new.data.to.shutdown) instead of being stuck on the first one until there is an error.

It may not fully solve all use cases, such as if you aren't expecting the No New Data condition in all the tables in the multi-job. Maybe a new PostWriteTerminationStrategy would be required for some use cases, or a refactor of how the loop functions.

It does not go back to the beginning of the tables and continuously loop over the set of MultiTables, because after the last one has NoNewData the Spark job will end. So I am handling that within the job orchestration.

sydneyhoran avatar Mar 08 '23 19:03 sydneyhoran

After more testing, I believe one more code change is required to stop the async services in other threads when NoNewDataTerminationStrategy is reached. Otherwise the background threads remained alive and left the spark job running. If you change it to forcefully terminate, it threw sleep interrupted errors when the Delta Sync thread was shut down.

sydneyhoran avatar Mar 09 '23 18:03 sydneyhoran