hudi icon indicating copy to clipboard operation
hudi copied to clipboard

[SUPPORT] Hudi error while running HoodieMultiTableDeltaStreamer: Commit 20220809112130103 failed and rolled-back !

Open ROOBALJINDAL opened this issue 2 years ago • 9 comments

Environment: AWS EMR Cluster: 'emr-6.7.0' version Hudi - 0.11.0 Hive - 3.1.3 Hadoop: 3.2.1 Spark: 3.2.1

Steps to reproduce the behavior: I am running Multi table streamer and as of now I am running it for 1 table only. We are using SQL server for CDC and debezium connector to push into Kafka and then reading kafka to pull records into hudi/hive tables.

kafka-source.properties

hoodie.deltastreamer.ingestion.tablesToBeIngested=default.rrmencounter
hoodie.deltastreamer.ingestion.default.rrmencounter.configFile=s3://hudi-multistreamer-roobal/hudi-ingestion-config/rrmencounter-config.properties
auto.offset.reset=earliest
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator
hoodie.deltastreamer.source.kafka.value.deserializer.class=io.confluent.kafka.serializers.KafkaAvroDeserializer
schema.registry.url=http://10.151.46.161:8080/apis/ccompat/v6
bootstrap.servers=b-2.rjtest12mskcluster.sx7vgn.c13.kafka.us-west-2.amazonaws.com:9092,b-1.rjtest12mskcluster.sx7vgn.c13.kafka.us-west-2.amazonaws.com:9092

rrmencounter-config.properties

hoodie.datasource.write.recordkey.field=rrmencountersid
hoodie.datasource.write.partitionpath.field=receiptdt
hoodie.deltastreamer.ingestion.targetBasePath=s3://hudi-multistreamer-roobal/hudi/rrmencounter
hoodie.datasource.hive_sync.database=default
hoodie.datasource.hive_sync.table=rrmencounter
hoodie.datasource.hive_sync.partition_fields=receiptdt
hoodie.datasource.hive_sync.support_timestamp=true
hoodie.deltastreamer.keygen.timebased.timestamp.type=EPOCHMILLISECONDS
hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd
hoodie.deltastreamer.keygen.timebased.timezone=GMT+8:00
hoodie.datasource.write.keygenerator.consistent.logical.timestamp.enabled=true
hoodie.datasource.write.hive_style_partitioning=true
hoodie.deltastreamer.source.kafka.topic=ROOBJIN-LW13206.dbo.rrmencounter
hoodie.deltastreamer.schemaprovider.registry.url=http://10.151.46.161:8080/apis/ccompat/v6/subjects/ROOBJIN-LW13206.dbo.rrmencounter-value/versions/latest
hoodie.deltastreamer.schemaprovider.registry.targetUrl=http://10.151.46.161:8080/apis/ccompat/v6/subjects/ROOBJIN-LW13206.dbo.rrmencounter-value/versions/latest

Spark command:

spark-submit  \
  --jars "/usr/lib/spark/external/lib/spark-avro.jar" \
  --master yarn --deploy-mode client \
  --class org.apache.hudi.utilities.deltastreamer.HoodieMultiTableDeltaStreamer s3://slava-redshift-test/hudi/hudi-utilities-bundle_2.12-0.11.0_edfx.jar \
  --props s3://hudi-multistreamer-roobal/hudi-ingestion-config/kafka-source.properties \
  --config-folder s3://hudi-multistreamer-roobal/hudi-ingestion-config/ \
  --payload-class org.apache.hudi.common.model.debezium.MssqlDebeziumAvroPayload \
  --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \
  --source-class org.apache.hudi.utilities.sources.debezium.MssqlDebeziumSource \
  --source-ordering-field _event_lsn \
  --min-sync-interval-seconds 60 \
  --enable-hive-sync \
  --table-type COPY_ON_WRITE \
  --base-path-prefix s3:///hudi-multistreamer-roobal/hudi \
  --target-table rrmencounter  --continuous \
  --op UPSERT

Output: It creates multilevel directories based on partition like 2022 / 06 / 11 but neither hive table nor parquet data file is getting generated.

Please note: HoodieDeltaStreamer is working fine for same table with following spark command but HoodieMultiTableDeltaStreamer is not working. May be I am missing something

Following works fine for HoodieDeltaStreamer :

spark-submit  \
  --jars "/usr/lib/spark/external/lib/spark-avro.jar" \
  --master yarn --deploy-mode client \
  --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer s3://slava-redshift-test/hudi/hudi-utilities-bundle_2.12-0.11.0_edfx.jar \
  --table-type COPY_ON_WRITE --op UPSERT \
  --target-base-path s3://hudi-multistreamer-roobal/hudi/rrmencounter \
  --target-table rrmencounter  --continuous \
  --min-sync-interval-seconds 60 \
  --source-class org.apache.hudi.utilities.sources.debezium.MssqlDebeziumSource \
  --source-ordering-field _event_lsn \
  --payload-class org.apache.hudi.common.model.debezium.MssqlDebeziumAvroPayload \
  --hoodie-conf schema.registry.url=http://10.151.46.161:8080/apis/ccompat/v6 \
  --hoodie-conf hoodie.deltastreamer.schemaprovider.registry.url=http://10.151.46.161:8080/apis/ccompat/v6/subjects/ROOBJIN-LW13206.dbo.rrmencounter-value/versions/latest \
  --hoodie-conf hoodie.deltastreamer.source.kafka.value.deserializer.class=io.confluent.kafka.serializers.KafkaAvroDeserializer \
  --hoodie-conf hoodie.deltastreamer.source.kafka.topic=ROOBJIN-LW13206.dbo.rrmencounter \
  --hoodie-conf auto.offset.reset=earliest \
  --hoodie-conf hoodie.datasource.write.recordkey.field=rrmencountersid \
  --hoodie-conf hoodie.datasource.write.partitionpath.field=receiptdt \
  --hoodie-conf hoodie.deltastreamer.keygen.timebased.timestamp.type=EPOCHMILLISECONDS \
  --hoodie-conf hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd \
  --hoodie-conf hoodie.deltastreamer.keygen.timebased.timezone=GMT+8:00 \
  --hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator \
  --hoodie-conf hoodie.datasource.hive_sync.support_timestamp=true \
  --enable-hive-sync \
  --hoodie-conf hoodie.datasource.write.hive_style_partitioning=true \
  --hoodie-conf hoodie.datasource.hive_sync.database=default \
  --hoodie-conf hoodie.datasource.hive_sync.table=rrmencounter \
  --hoodie-conf hoodie.datasource.hive_sync.partition_fields=receiptdt \
  --hoodie-conf hoodie.datasource.write.keygenerator.consistent.logical.timestamp.enabled=true \
  --hoodie-conf bootstrap.servers=b-2.rjtest12mskcluster.sx7vgn.c13.kafka.us-west-2.amazonaws.com:9092,b-1.rjtest12mskcluster.sx7vgn.c13.kafka.us-west-2.amazonaws.com:9092

Stacktrace

22/08/09 11:21:53 INFO SparkContext: Starting job: collect at SparkHoodieBackedTableMetadataWriter.java:166
22/08/09 11:21:53 INFO DAGScheduler: Job 21 finished: collect at SparkHoodieBackedTableMetadataWriter.java:166, took 0.000510 s
22/08/09 11:21:53 INFO MultipartUploadOutputStream: close closed:false s3://hudi-multistreamer-roobal/hudi/rrmencounter/.hoodie/20220809112149403.rollback
22/08/09 11:21:53 INFO SparkContext: Starting job: collectAsMap at HoodieSparkEngineContext.java:151
22/08/09 11:21:53 INFO DAGScheduler: Got job 22 (collectAsMap at HoodieSparkEngineContext.java:151) with 2 output partitions
22/08/09 11:21:53 INFO DAGScheduler: Final stage: ResultStage 51 (collectAsMap at HoodieSparkEngineContext.java:151)
22/08/09 11:21:53 INFO DAGScheduler: Parents of final stage: List()
22/08/09 11:21:53 INFO DAGScheduler: Missing parents: List()
22/08/09 11:21:53 INFO DAGScheduler: Submitting ResultStage 51 (MapPartitionsRDD[100] at mapToPair at HoodieSparkEngineContext.java:148), which has no missing parents
22/08/09 11:21:53 INFO MemoryStore: Block broadcast_29 stored as values in memory (estimated size 115.5 KiB, free 911.4 MiB)
22/08/09 11:21:53 INFO MemoryStore: Block broadcast_29_piece0 stored as bytes in memory (estimated size 41.1 KiB, free 911.4 MiB)
22/08/09 11:21:53 INFO BlockManagerInfo: Added broadcast_29_piece0 in memory on ip-10-151-46-163.us-west-2.compute.internal:40011 (size: 41.1 KiB, free: 912.2 MiB)
22/08/09 11:21:53 INFO SparkContext: Created broadcast 29 from broadcast at DAGScheduler.scala:1518
22/08/09 11:21:53 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 51 (MapPartitionsRDD[100] at mapToPair at HoodieSparkEngineContext.java:148) (first 15 tasks are for partitions Vector(0, 1))
22/08/09 11:21:53 INFO YarnScheduler: Adding task set 51.0 with 2 tasks resource profile 0
22/08/09 11:21:53 INFO TaskSetManager: Starting task 0.0 in stage 51.0 (TID 2020) (ip-10-151-46-136.us-west-2.compute.internal, executor 1, partition 0, PROCESS_LOCAL, 4440 bytes) taskResourceAssignments Map()
22/08/09 11:21:53 INFO TaskSetManager: Starting task 1.0 in stage 51.0 (TID 2021) (ip-10-151-46-136.us-west-2.compute.internal, executor 1, partition 1, PROCESS_LOCAL, 4436 bytes) taskResourceAssignments Map()
22/08/09 11:21:53 INFO BlockManagerInfo: Added broadcast_29_piece0 in memory on ip-10-151-46-136.us-west-2.compute.internal:34971 (size: 41.1 KiB, free: 1846.7 MiB)
22/08/09 11:21:53 INFO TaskSetManager: Finished task 1.0 in stage 51.0 (TID 2021) in 52 ms on ip-10-151-46-136.us-west-2.compute.internal (executor 1) (1/2)
22/08/09 11:21:53 INFO TaskSetManager: Finished task 0.0 in stage 51.0 (TID 2020) in 91 ms on ip-10-151-46-136.us-west-2.compute.internal (executor 1) (2/2)
22/08/09 11:21:53 INFO YarnScheduler: Removed TaskSet 51.0, whose tasks have all completed, from pool
22/08/09 11:21:53 INFO DAGScheduler: ResultStage 51 (collectAsMap at HoodieSparkEngineContext.java:151) finished in 0.101 s
22/08/09 11:21:53 INFO DAGScheduler: Job 22 is finished. Cancelling potential speculative or zombie tasks for this job
22/08/09 11:21:53 INFO YarnScheduler: Killing all running tasks in stage 51: Stage finished
22/08/09 11:21:53 INFO DAGScheduler: Job 22 finished: collectAsMap at HoodieSparkEngineContext.java:151, took 0.105640 s
22/08/09 11:21:53 ERROR HoodieDeltaStreamer: Shutting down delta-sync due to exception
org.apache.hudi.exception.HoodieException: Commit 20220809112130103 failed and rolled-back !
        at org.apache.hudi.utilities.deltastreamer.DeltaSync.writeToSink(DeltaSync.java:649)
        at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:331)
        at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$0(HoodieDeltaStreamer.java:675)
        at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
22/08/09 11:21:53 ERROR HoodieAsyncService: Service shutdown with error
java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: Commit 20220809112130103 failed and rolled-back !
        at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
        at org.apache.hudi.async.HoodieAsyncService.waitForShutdown(HoodieAsyncService.java:103)
        at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$1(HoodieDeltaStreamer.java:189)
        at org.apache.hudi.common.util.Option.ifPresent(Option.java:97)
        at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:186)
        at org.apache.hudi.utilities.deltastreamer.HoodieMultiTableDeltaStreamer.sync(HoodieMultiTableDeltaStreamer.java:416)
        at org.apache.hudi.utilities.deltastreamer.HoodieMultiTableDeltaStreamer.main(HoodieMultiTableDeltaStreamer.java:247)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1000)
        at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
        at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1089)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1098)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.hudi.exception.HoodieException: Commit 20220809112130103 failed and rolled-back !
        at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$0(HoodieDeltaStreamer.java:709)
        at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.hudi.exception.HoodieException: Commit 20220809112130103 failed and rolled-back !
        at org.apache.hudi.utilities.deltastreamer.DeltaSync.writeToSink(DeltaSync.java:649)
        at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:331)
        at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$0(HoodieDeltaStreamer.java:675)
        ... 4 more
22/08/09 11:21:53 ERROR HoodieMultiTableDeltaStreamer: error while running MultiTableDeltaStreamer for table: rrmencounter
org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: Commit 20220809112130103 failed and rolled-back !
        at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$1(HoodieDeltaStreamer.java:191)
        at org.apache.hudi.common.util.Option.ifPresent(Option.java:97)
        at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:186)
        at org.apache.hudi.utilities.deltastreamer.HoodieMultiTableDeltaStreamer.sync(HoodieMultiTableDeltaStreamer.java:416)
        at org.apache.hudi.utilities.deltastreamer.HoodieMultiTableDeltaStreamer.main(HoodieMultiTableDeltaStreamer.java:247)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1000)
        at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
        at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1089)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1098)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: Commit 20220809112130103 failed and rolled-back !
        at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
        at org.apache.hudi.async.HoodieAsyncService.waitForShutdown(HoodieAsyncService.java:103)
        at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$1(HoodieDeltaStreamer.java:189)
        ... 16 more
Caused by: org.apache.hudi.exception.HoodieException: Commit 20220809112130103 failed and rolled-back !
        at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$0(HoodieDeltaStreamer.java:709)
        at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.hudi.exception.HoodieException: Commit 20220809112130103 failed and rolled-back !
        at org.apache.hudi.utilities.deltastreamer.DeltaSync.writeToSink(DeltaSync.java:649)
        at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:331)
        at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$0(HoodieDeltaStreamer.java:675)
        ... 4 more
22/08/09 11:21:53 INFO Javalin: Stopping Javalin ...
22/08/09 11:21:53 INFO AbstractConnector: Stopped Spark@64387c17{HTTP/1.1, (http/1.1)}{0.0.0.0:8090}
22/08/09 11:21:53 ERROR Javalin: Javalin failed to stop gracefully
java.lang.InterruptedException
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1326)
        at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:277)
        at org.apache.hudi.org.eclipse.jetty.server.AbstractConnector.doStop(AbstractConnector.java:333)
        at org.apache.hudi.org.eclipse.jetty.server.AbstractNetworkConnector.doStop(AbstractNetworkConnector.java:88)
        at org.apache.hudi.org.eclipse.jetty.server.ServerConnector.doStop(ServerConnector.java:248)
        at org.apache.hudi.org.eclipse.jetty.util.component.AbstractLifeCycle.stop(AbstractLifeCycle.java:89)
        at org.apache.hudi.org.eclipse.jetty.server.Server.doStop(Server.java:450)
        at org.apache.hudi.org.eclipse.jetty.util.component.AbstractLifeCycle.stop(AbstractLifeCycle.java:89)
        at io.javalin.Javalin.stop(Javalin.java:195)
        at org.apache.hudi.timeline.service.TimelineService.close(TimelineService.java:334)
        at org.apache.hudi.client.embedded.EmbeddedTimelineService.stop(EmbeddedTimelineService.java:137)
        at org.apache.hudi.utilities.deltastreamer.DeltaSync.close(DeltaSync.java:876)
        at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.close(HoodieDeltaStreamer.java:811)
        at org.apache.hudi.common.util.Option.ifPresent(Option.java:97)
        at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.onDeltaSyncShutdown(HoodieDeltaStreamer.java:222)
        at org.apache.hudi.async.HoodieAsyncService.lambda$shutdownCallback$0(HoodieAsyncService.java:171)
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
22/08/09 11:21:53 INFO Javalin: Javalin has stopped
22/08/09 11:21:53 INFO SparkUI: Stopped Spark web UI at http://ip-10-151-46-163.us-west-2.compute.internal:8090
22/08/09 11:21:53 INFO YarnClientSchedulerBackend: Interrupting monitor thread
22/08/09 11:21:53 INFO YarnClientSchedulerBackend: Shutting down all executors
22/08/09 11:21:53 INFO YarnSchedulerBackend$YarnDriverEndpoint: Asking each executor to shut down
22/08/09 11:21:53 INFO YarnClientSchedulerBackend: YARN client scheduler backend Stopped
22/08/09 11:21:53 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
22/08/09 11:21:53 INFO MemoryStore: MemoryStore cleared
22/08/09 11:21:53 INFO BlockManager: BlockManager stopped
22/08/09 11:21:53 INFO BlockManagerMaster: BlockManagerMaster stopped
22/08/09 11:21:53 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
22/08/09 11:21:53 INFO SparkContext: Successfully stopped SparkContext
22/08/09 11:21:53 INFO ShutdownHookManager: Shutdown hook called
22/08/09 11:21:53 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-1ac0b680-8bcb-46e4-89f7-3da932646847
22/08/09 11:21:53 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-2b324903-b982-45d3-944a-647af07783f9

Add the stacktrace of the error. org.apache.hudi.exception.HoodieException: Commit 20220809112130103 failed and rolled-back !

ROOBALJINDAL avatar Aug 09 '22 11:08 ROOBALJINDAL

@pratyakshsharma I think you are the best person to help :) Can you please check?

ROOBALJINDAL avatar Aug 09 '22 11:08 ROOBALJINDAL

I guess you suspect schema registry urls to be root cause based on your other comment here https://github.com/apache/hudi/pull/4779#issuecomment-1209002671?

pratyakshsharma avatar Aug 09 '22 12:08 pratyakshsharma

I upgraded to 0.11 release of hudi and simply used tragetUrl instead of targetUrlSuffix so it worked now I am not getting errors related to target and source registry urls anymore. So I am assuming it is resolved but further got stuck on this. Also I have kept target url same as regitry url, not sure what would be the correct value.

Can you tell what can be the issue by looking at the config if you can get it what's missing here?

@pratyakshsharma

ROOBALJINDAL avatar Aug 09 '22 12:08 ROOBALJINDAL

@pratyakshsharma : if any documentation needs to be improved wrt multi table deltastreamer, can you work on it. CC @bhasudha . we have seen few users from the community raised questions/issues around schema registry configs for multi table deltastreamer. would be beneficial if we can enhance our docs.

nsivabalan avatar Aug 10 '22 02:08 nsivabalan

I further checked and downloaded aws logs of the node and checked the exact exception. Do you have any idea about what's going wrong? @nsivabalan @pratyakshsharma

Stack trace

22/08/10 07:31:24 ERROR HoodieWriteHandle: Error writing record HoodieRecord{key=HoodieKey { recordKey=1201 partitionPath=rec****=2022/06/22}, currentLocation='null', newLocation='null'}
java.lang.UnsupportedOperationException: Cannot read strings longer than 2147483639 bytes
	at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:305)
	at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:208)
	at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:469)
	at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:459)
	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:191)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
	at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
	at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
	at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
	at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
	at org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro(HoodieAvroUtils.java:156)
	at org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro(HoodieAvroUtils.java:146)
	at org.apache.hudi.common.model.OverwriteWithLatestAvroPayload.getInsertValue(OverwriteWithLatestAvroPayload.java:75)
	at org.apache.hudi.common.model.debezium.AbstractDebeziumAvroPayload.getInsertRecord(AbstractDebeziumAvroPayload.java:87)
	at org.apache.hudi.common.model.debezium.AbstractDebeziumAvroPayload.getInsertValue(AbstractDebeziumAvroPayload.java:58)
	at org.apache.hudi.common.model.HoodieRecordPayload.getInsertValue(HoodieRecordPayload.java:105)
	at org.apache.hudi.execution.HoodieLazyInsertIterable$HoodieInsertValueGenResult.<init>(HoodieLazyInsertIterable.java:90)
	at org.apache.hudi.execution.HoodieLazyInsertIterable.lambda$getTransformFunction$0(HoodieLazyInsertIterable.java:103)
	at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.insertRecord(BoundedInMemoryQueue.java:190)
	at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:46)
	at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:105)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
22/08/10 07:31:25 INFO MultipartUploadOutputStream: close closed:false s3://xxxxxxxxxx/hudi/rrmencounter/rec******=2022/06/22/c839dd87-e7e1-4878-bb38-999d331bed33-0_0-29-2008_20220810073102064.parquet
22/08/10 07:31:25 INFO MemoryStore: Block rdd_64_0 stored as values in memory (estimated size 2.4 KiB, free 1843.7 MiB)
22/08/10 07:31:25 INFO Executor: Finished task 0.0 in stage 29.0 (TID 2008). 1593 bytes result sent to driver
22/08/10 07:31:25 INFO YarnCoarseGrainedExecutorBackend: Got assigned task 2009
22/08/10 07:31:25 INFO Executor: Running task 0.0 in stage 35.0 (TID 2009)
22/08/10 07:31:25 INFO TorrentBroadcast: Started reading broadcast variable 18 with 1 pieces (estimated total size 4.0 MiB)
22/08/10 07:31:25 INFO MemoryStore: Block broadcast_18_piece0 stored as bytes in memory (estimated size 329.2 KiB, free 1843.4 MiB)
22/08/10 07:31:25 INFO TorrentBroadcast: Reading broadcast variable 18 took 7 ms
22/08/10 07:31:25 INFO MemoryStore: Block broadcast_18 stored as values in memory (estimated size 1357.0 KiB, free 1842.1 MiB)
22/08/10 07:31:25 INFO BlockManager: Found block rdd_64_0 locally

ROOBALJINDAL avatar Aug 10 '22 09:08 ROOBALJINDAL

This does not look related to HoodieMultiTableDeltaStreamer class in any way. What is the incoming schema for your records? Do you have some string field whose value size is in GB?

pratyakshsharma avatar Aug 10 '22 09:08 pratyakshsharma

@pratyakshsharma No, there is no such field. Even with same schema, it works with HoodieDeltaStreamer but not with HoodieMultiTableDeltaStreamer.

ROOBALJINDAL avatar Aug 10 '22 09:08 ROOBALJINDAL

It looks like the deserialization of avro records from kafka topic is not happening properly. I see you are using Apicurio schema registry but trying to use confluent avro deserializer class with it (hoodie.deltastreamer.source.kafka.value.deserializer.class=io.confluent.kafka.serializers.KafkaAvroDeserializer).

You should probably be using corresponding deserializers for apicurio as highlighted here in debezium docs (https://debezium.io/documentation/reference/stable/configuration/avro.html#about-the-registry). Another thing to note is you are using Sql server and hence written your own custom classes - org.apache.hudi.utilities.sources.debezium.MssqlDebeziumSource and org.apache.hudi.common.model.debezium.MssqlDebeziumAvroPayload. Can you confirm the deserializer that you are using in your custom source class as well?

pratyakshsharma avatar Aug 10 '22 12:08 pratyakshsharma

you might find this useful as well - https://debezium.io/blog/2020/04/09/using-debezium-with-apicurio-api-schema-registry/

pratyakshsharma avatar Aug 10 '22 12:08 pratyakshsharma

@pratyakshsharma We are not using any serializer in our custom class as it wasnt required. I believe it uses convertor that is being defined in kafka connect properties.

We are using following convertors in kafka connect:

key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://xx.xxx.xx.xxx:8080/apis/ccompat/v6
value.converter.schema.registry.url=http://xx.xxx.xx.xxx:8080/apis/ccompat/v6

Also point here is to note is that if there were some contradicting serializers being used, it would have not worked for HoodieDeltaStreamer (Not multi table ingestion) as well. But HoodieDeltaStreamer is working fine for same table and records.

ROOBALJINDAL avatar Aug 12 '22 07:08 ROOBALJINDAL

But HoodieDeltaStreamer is working fine for same table and records.

So you mean to say you are consuming exactly same records via HoodieDeltaStreamer and HoodieMultiTableDeltaStreamer? I was suspecting there is some type of record getting consumed from MultiTableDeltaStreamer which is not getting consumed in HoodieDeltaStreamer.

Can you try changing the converters to io.apicurio.registry.utils.converter.AvroConverter as suggested for Apicurio registry and let me know the results? If this also does not help, we will try something else.

pratyakshsharma avatar Aug 16 '22 12:08 pratyakshsharma

@pratyakshsharma Yes, table has 1 record. I ran HoodieDeltaStreamer for same record and table, it worked but for same table/record with same serializers and registry, when I ran HoodieMultiTableDeltaStreamer, its throwing error. I have enabled debug logs, let me share it with you in a while

ROOBALJINDAL avatar Aug 17 '22 07:08 ROOBALJINDAL

@pratyakshsharma : if any documentation needs to be improved wrt multi table deltastreamer, can you work on it. CC @bhasudha . we have seen few users from the community raised questions/issues around schema registry configs for multi table deltastreamer. would be beneficial if we can enhance our docs.

https://github.com/apache/hudi/pull/6420

pratyakshsharma avatar Aug 17 '22 10:08 pratyakshsharma

AWS debug console logs - commit rollback.txt

@pratyakshsharma @nsivabalan enabled debug logs and attached, can you please check?

ROOBALJINDAL avatar Aug 17 '22 11:08 ROOBALJINDAL

@ROOBALJINDAL When using DebeziumSource, please do not set the --schemaprovider-class, since the schema is applied in the source. Can you try after removing that config? I see that you did not apply that config for HoodieDeltastreamer

rmahindra123 avatar Aug 23 '22 08:08 rmahindra123

@rmahindra123 thanks, it worked. But I added 2 tables for ingestion but it is always picking first table mentioned in comma separated list. I have kept all other configs same as it is. Can you tell what is missing?

Kafka-source.properties

hoodie.deltastreamer.ingestion.tablesToBeIngested=default.table1,default.table2
hoodie.deltastreamer.ingestion.default.table1.configFile=s3://config/table1-config.properties
hoodie.deltastreamer.ingestion.default.table2.configFile=s3://config/table2-config.properties

@pratyakshsharma @nsivabalan

ROOBALJINDAL avatar Aug 23 '22 11:08 ROOBALJINDAL

did you supply the continuous flag as well?

pratyakshsharma avatar Aug 23 '22 13:08 pratyakshsharma

For Multitable Deltastreamer, it runs the ingestion sequentially, so it will first ingest table1 and then table2. Let me know if you still are facing issues.

rmahindra123 avatar Aug 23 '22 17:08 rmahindra123

@rmahindra123 @pratyakshsharma yes, it is working fine for me after removing continous tag but spark job ends after the job is done. I want it to be keep running and checking if there is some new message in kafka topic and sync it with hudi/hive just like HoodieDeltaStreamer. Is this not supported for HoodieMultiTableDeltaStreamer?

ROOBALJINDAL avatar Aug 25 '22 08:08 ROOBALJINDAL

@ROOBALJINDAL PR is already up to support this for HoodieMultiTableDeltaStreamer, I need to write test cases for the same. You can track it here - https://github.com/apache/hudi/pull/5071.

pratyakshsharma avatar Aug 25 '22 12:08 pratyakshsharma