hudi
hudi copied to clipboard
[SUPPORT] Running `--continuous` mode with HoodieMultiTableDeltaStreamer seems to only ingest first table
Describe the problem you faced
New to Hudi so very well could be some configuration issue on my side. I'm trying to set up a continuous multi-table ingestion job. I can successfully ingest multiple tables when running without --continuous
flag so properties seem set up correctly. When I add the --continuous
flag, the job seems to be ingesting data for the first table in continuous mode and never proceeds to ingest data from other tables.
To Reproduce
Steps to reproduce the behavior:
- Set up multitable ingestion with
--continuous
flag - Run hudi job
- Observe first table processing new incoming data, but other tables not progressing
Expected behavior
Multitable ingestion with --continuous
flag on should process data continually cycling through all tables
Environment Description Using AWS EMR Release 6.6
-
Hudi version : 0.10.1-amzn-0
-
Spark version : 3.2.0
-
Hive version : 3.1.2
-
Hadoop version : 3.2.1
-
Storage (HDFS/S3/GCS..) : S3
-
Running on Docker? (yes/no) : No
Additional context
spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieMultiTableDeltaStreamer --packages org.apache.hudi:hudi-utilities-bundle_2.12:0.10.1,org.apache.spark:spark-avro_2.12:2.4.5 --master yarn --deploy-mode cluster --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false /usr/lib/hudi/hudi-utilities-bundle_2.12-0.10.1-amzn-0.jar --table-type COPY_ON_WRITE --source-class org.apache.hudi.utilities.sources.ParquetDFSSource --payload-class org.apache.hudi.payload.AWSDmsAvroPayload --source-ordering-field ts --base-path-prefix s3://{hudi_root} --target-table dummy_table --transformer-class org.apache.hudi.utilities.transform.AWSDmsTransformer --props s3://{properties_file} --config-folder s3://{config_directory} --continuous --min-sync-interval-seconds 60 --source-limit 2147483648
Stacktrace
@ddai-shippo This looks like a legit bug where the continuous running of the first table blocks the rest in the HoodieMultiTableDeltaStreamer. I'll work on a fix.
@yihua @ddai-shippo the only way the multitable deltastreamer job moves to the second table is if there is any error on the first table. Or else, the multitable deltastreamer keeps running only on the first table.
@yihua any ETA for the fix?
@brskiran1 I'll try to put up a simple fix this week so that at least the HoodieMultiTableDeltaStreamer can iterate through all tables in the continuous mode.
@yihua : if you have a tracking jira, can you post it here.
https://issues.apache.org/jira/browse/HUDI-1881 Lets track the updates from jira. thanks for reporting.
I added a workaround for this issue in my local fork of Hudi. Small tweaks to HoodieAsyncService.java and HoodieMultiTableDeltaStreamer.java were required and now it's working as expected. The executor shutdown timeout being 24 hours was causing the job to hang, so I changed it to 10 seconds with no negative consequences. I also enabled the config --post-write-termination-strategy-class org.apache.hudi.utilities.deltastreamer.NoNewDataTerminationStrategy
in MultiTableDeltaStreamer, so it can move on to the next table in MultiTableDeltaStreamer after N number of retries with no new data (max.rounds.without.new.data.to.shutdown) instead of being stuck on the first one until there is an error.
It may not fully solve all use cases, such as if you aren't expecting the No New Data condition in all the tables in the multi-job. Maybe a new PostWriteTerminationStrategy would be required for some use cases, or a refactor of how the loop functions.
It does not go back to the beginning of the tables and continuously loop over the set of MultiTables, because after the last one has NoNewData the Spark job will end. So I am handling that within the job orchestration.
After more testing, I believe one more code change is required to stop the async services in other threads when NoNewDataTerminationStrategy is reached. Otherwise the background threads remained alive and left the spark job running. If you change it to forcefully terminate, it threw sleep interrupted
errors when the Delta Sync thread was shut down.