hudi icon indicating copy to clipboard operation
hudi copied to clipboard

[SUPPORT]Fail to write/merge when missing any nested fields in a struct. [An error occurred while calling o185.save. Can't redefine: element]

Open gtwuser opened this issue 3 years ago • 2 comments

Tips before filing an issue

  • Have you gone through our FAQs?
  • YES

A clear and concise description of the problem. With missing nested fields in a struct, it fails to write on hudi. Error - An error occurred while calling o185.save. Can't redefine: element

Data schema with last write has linkAddr1 and linkAddr2 but later writes are missing them:

 |-- LogicalLinks: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- ClassId: string (nullable = true)
 |    |    |-- Id: long (nullable = true)
 |    |    |-- IsPresent: boolean (nullable = true)
 |    |    |-- LinkAddr1: string (nullable = true)   // this field will be missing in the next merge/append 
 |    |    |-- LinkAddr2: string (nullable = true)  // this field will be missing in the next merge/append 
 |    |    |-- LinkState: string (nullable = true)
 |    |    |-- LinkType: string (nullable = true)
 |    |    |-- ObjectType: string (nullable = true)

Data schema of later writes:

|-- LogicalLinks: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- ClassId: string (nullable = true)
 |    |    |-- Id: long (nullable = true)
 |    |    |-- IsPresent: boolean (nullable = true)
 |    |    |-- LinkState: string (nullable = true)
 |    |    |-- LinkType: string (nullable = true)
 |    |    |-- ObjectType: string (nullable = true)
 |    |    |-- Uptime: string (nullable = true)

Steps to reproduce the behaviour:

  1. Write to hudi DF created with below given json:
{
    "LogicalLinks": [
        {
            "ClassId": "some.Link",
            "Id": 700,
            "IsPresent": true,
            "LinkAddr1": "",
            "LinkAddr2": "",
            "LinkState": "failed",
            "LinkType": "some",
            "ObjectType": "some.Link",
            "Uptime": "7d 00:00:00"
        }
    ]
}
  1. Update the given table with below data, which is missing linkAddr1 and linkAddr2 :
{
    "LogicalLinks": [
        {
            "ClassId": "some.Link",
            "Id": 700,
            "IsPresent": true,
            "LinkAddr2": "",
            "LinkState": "failed",
            "LinkType": "some",
            "ObjectType": "some.Link",
            "Uptime": "7d 00:00:00"
        }
    ]
}

Expected behavior With schema evolution it should had updated record.

Environment Description

  • Hudi version : 0.11.1

  • Spark version : 3.1

  • Storage (HDFS/S3/GCS..) : S3

  • Running on Docker? (yes/no) : no

Additional context

Add any other context about the problem here. Spark jobs are ran on AWS Glue 3.0

Stacktrace

Add the stacktrace of the error.

2022-08-06 13:08:45,449 ERROR [main] glue.ProcessLauncher (Logging.scala:logError(73)): Error from Python:Traceback (most recent call last):
  File "/tmp/second-delete-upsert.py", line 248, in <module>
    startMerging(df_prefix_map_list)
  File "/tmp/second-delete-upsert.py", line 190, in startMerging
    .mode('append') \
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 1107, in save
    self._jwrite.save()
  File "/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o185.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 36.0 failed 4 times, most recent failure: Lost task 0.3 in stage 36.0 (TID 273) (172.35.121.54 executor 1): org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:329)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:244)
	at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
	at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1440)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1237)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.hudi.exception.HoodieException: org.apache.avro.SchemaParseException: Can't redefine: element
	at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:149)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:358)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:349)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322)
	... 28 more
Caused by: org.apache.avro.SchemaParseException: Can't redefine: element
	at org.apache.avro.Schema$Names.put(Schema.java:1128)
	at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:562)
	at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:690)
	at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:882)
	at org.apache.avro.Schema$ArraySchema.toJson(Schema.java:805)
	at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:882)
	at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:716)
	at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:701)
	at org.apache.avro.Schema.toString(Schema.java:324)
	at org.apache.avro.Schema.toString(Schema.java:314)
	at org.apache.parquet.avro.AvroReadSupport.setAvroReadSchema(AvroReadSupport.java:69)
	at org.apache.hudi.io.storage.HoodieParquetReader.getRecordIterator(HoodieParquetReader.java:69)
	at org.apache.hudi.io.storage.HoodieFileReader.getRecordIterator(HoodieFileReader.java:43)
	at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:132)
	... 31 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2465)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2414)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2413)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2413)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1124)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1124)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1124)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2679)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2621)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2610)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:914)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2238)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2259)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2278)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2303)
	at org.apache.spark.rdd.RDD.count(RDD.scala:1253)
	at org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:696)
	at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:338)
	at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:184)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
	at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
	at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:329)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:244)
	at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
	at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1440)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1237)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: org.apache.hudi.exception.HoodieException: org.apache.avro.SchemaParseException: Can't redefine: element
	at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:149)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:358)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:349)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322)
	... 28 more
Caused by: org.apache.avro.SchemaParseException: Can't redefine: element
	at org.apache.avro.Schema$Names.put(Schema.java:1128)
	at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:562)
	at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:690)
	at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:882)
	at org.apache.avro.Schema$ArraySchema.toJson(Schema.java:805)
	at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:882)
	at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:716)
	at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:701)
	at org.apache.avro.Schema.toString(Schema.java:324)
	at org.apache.avro.Schema.toString(Schema.java:314)
	at org.apache.parquet.avro.AvroReadSupport.setAvroReadSchema(AvroReadSupport.java:69)
	at org.apache.hudi.io.storage.HoodieParquetReader.getRecordIterator(HoodieParquetReader.java:69)
	at org.apache.hudi.io.storage.HoodieFileReader.getRecordIterator(HoodieFileReader.java:43)
	at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:132)
	... 31 more

gtwuser avatar Aug 06 '22 13:08 gtwuser

@gtwuser could you provide the Hudi configs you used? Which Hudi payload class do you use?

By default, if the new batch of data only has the partial schema and corresponding fields compared to the existing table, it causes undefined behavior. @xiarixiaoyao does schema evolution in Hudi Spark support this?

yihua avatar Aug 08 '22 05:08 yihua

@gtwuser pls try add "hoodie.schema.on.read.enable" -> "true", "hoodie.datasource.write.reconcile.schema" -> "true",

xiarixiaoyao avatar Aug 10 '22 12:08 xiarixiaoyao

@gtwuser : gentle ping.

nsivabalan avatar Sep 12 '22 22:09 nsivabalan

@gtwuser i don't think you even need to enable advanced schema evolution that @xiarixiaoyao was suggesting. Just enabling hoodie.datasource.write.reconcile.schema=true should work for your use-case

alexeykudinkin avatar Sep 13 '22 00:09 alexeykudinkin

Even after adding hoodie.datasource.write.reconcile.schema=true we are still getting issue in different tables. This time the error is Caused by: org.apache.avro.SchemaParseException: Can't redefine: list Stack trace:

2022-09-30 20:08:29,172 ERROR [main] glue.ProcessLauncher (Logging.scala:logError(73)): Error from Python:Traceback (most recent call last):
  File "/tmp/second-delete-upsert.py", line 265, in <module>
    startMerging(df_prefix_map_list)
  File "/tmp/second-delete-upsert.py", line 260, in startMerging
    .mode('append') \
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 1107, in save
    self._jwrite.save()
  File "/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o514.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 275.0 failed 4 times, most recent failure: Lost task 0.3 in stage 275.0 (TID 1391) (172.34.1.80 executor 1): org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:329)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:244)
	at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
	at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1440)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1237)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed
	at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:149)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:358)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:349)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322)
	... 28 more
Caused by: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed
	at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:161)
	at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:147)
	... 31 more
Caused by: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed
	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
	at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:155)
	... 32 more
Caused by: org.apache.hudi.exception.HoodieException: operation has failed
	at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.throwExceptionIfFailed(BoundedInMemoryQueue.java:248)
	at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.readNextRecord(BoundedInMemoryQueue.java:226)
	at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.access$100(BoundedInMemoryQueue.java:52)
	at org.apache.hudi.common.util.queue.BoundedInMemoryQueue$QueueIterator.hasNext(BoundedInMemoryQueue.java:278)
	at org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:36)
	at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:135)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	... 3 more
Caused by: org.apache.hudi.exception.HoodieException: unable to read next record from parquet file 
	at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:53)
	at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
	at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:106)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	... 4 more
Caused by: org.apache.avro.SchemaParseException: Can't redefine: list
	at org.apache.avro.Schema$Names.put(Schema.java:1128)
	at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:562)
	at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:690)
	at org.apache.avro.Schema$ArraySchema.toJson(Schema.java:805)
	at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:882)
	at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:716)
	at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:701)
	at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:882)
	at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:716)
	at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:701)
	at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:882)
	at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:716)
	at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:701)
	at org.apache.avro.Schema.toString(Schema.java:324)
	at org.apache.avro.SchemaCompatibility.checkReaderWriterCompatibility(SchemaCompatibility.java:68)
	at org.apache.parquet.avro.AvroRecordConverter.isElementType(AvroRecordConverter.java:866)
	at org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter.<init>(AvroRecordConverter.java:475)
	at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:289)
	at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:141)
	at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:279)
	at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:141)
	at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:279)
	at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:141)
	at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:95)
	at org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33)
	at org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:138)
	at org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:183)
	at org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:156)
	at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135)
	at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:48)
	... 8 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2465)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2414)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2413)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2413)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1124)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1124)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1124)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2679)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2621)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2610)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:914)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2238)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2259)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2278)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2303)
	at org.apache.spark.rdd.RDD.count(RDD.scala:1253)
	at org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:696)
	at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:338)
	at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:184)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
	at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
	at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:329)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:244)
	at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
	at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1440)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1237)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed
	at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:149)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:358)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:349)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322)
	... 28 more
Caused by: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed
	at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:161)
	at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:147)
	... 31 more
Caused by: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed
	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
	at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:155)
	... 32 more
Caused by: org.apache.hudi.exception.HoodieException: operation has failed
	at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.throwExceptionIfFailed(BoundedInMemoryQueue.java:248)
	at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.readNextRecord(BoundedInMemoryQueue.java:226)
	at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.access$100(BoundedInMemoryQueue.java:52)
	at org.apache.hudi.common.util.queue.BoundedInMemoryQueue$QueueIterator.hasNext(BoundedInMemoryQueue.java:278)
	at org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:36)
	at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:135)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	... 3 more
Caused by: org.apache.hudi.exception.HoodieException: unable to read next record from parquet file 
	at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:53)
	at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
	at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:106)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	... 4 more
Caused by: org.apache.avro.SchemaParseException: Can't redefine: list
	at org.apache.avro.Schema$Names.put(Schema.java:1128)
	at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:562)
	at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:690)
	at org.apache.avro.Schema$ArraySchema.toJson(Schema.java:805)
	at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:882)
	at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:716)
	at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:701)
	at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:882)
	at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:716)
	at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:701)
	at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:882)
	at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:716)
	at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:701)
	at org.apache.avro.Schema.toString(Schema.java:324)
	at org.apache.avro.SchemaCompatibility.checkReaderWriterCompatibility(SchemaCompatibility.java:68)
	at org.apache.parquet.avro.AvroRecordConverter.isElementType(AvroRecordConverter.java:866)
	at org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter.<init>(AvroRecordConverter.java:475)
	at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:289)
	at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:141)
	at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:279)
	at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:141)
	at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:279)
	at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:141)
	at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:95)
	at org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33)
	at org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:138)
	at org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:183)
	at org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:156)
	at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135)
	at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:48)
	... 8 more

gtwuser avatar Sep 30 '22 20:09 gtwuser

Configs used:

incrementalConfig = {
                'hoodie.upsert.shuffle.parallelism': 68, 'hoodie.datasource.write.operation': 'upsert',
                'hoodie.cleaner.policy': 'KEEP_LATEST_COMMITS', 'hoodie.cleaner.commits.retained': 10
            }
partitionDataConfig = {
                'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
                'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.CustomKeyGenerator',
                'hoodie.datasource.write.partitionpath.field': 'year:SIMPLE, month:SIMPLE, day:SIMPLE, hour:SIMPLE, device_id:SIMPLE',
                'hoodie.datasource.hive_sync.partition_fields': 'year, month, day, hour, device_id',
                'hoodie.datasource.write.hive_style_partitioning': 'true'
            }
commonConfig = {
                'className': 'org.apache.hudi',
                'hoodie.datasource.hive_sync.use_jdbc': 'false',
                'hoodie.datasource.write.precombine.field': 'payload.recordedAt',
                'hoodie.datasource.write.recordkey.field': 'metadata.msgID,metadata.topic',
                'hoodie.table.name': 'sse',
                # 'hoodie.consistency.check.enabled': 'true',
                'hoodie.datasource.hive_sync.database': args['database_name'],
                'hoodie.datasource.write.reconcile.schema': 'true',
                'hoodie.datasource.hive_sync.table': f'sse_{"_".join(prefix.split("/")[-7:-5])}'.lower(),
                'hoodie.datasource.hive_sync.enable': 'true',
                'path': 's3://' + args['curated_bucket'] + '/merged/sse-native/' + f'{prefix.split("/")[-7]}'.lower(),
                # 1,024 * 1,024 * 128 = 134,217,728 (134 MB)
                'hoodie.parquet.small.file.limit': '307200',
                'hoodie.parquet.max.file.size': '128000000'
            }

gtwuser avatar Sep 30 '22 20:09 gtwuser

Now we have also update the environment Environment Description

Hudi version : 0.11.1

Spark version : 3.1

Storage (HDFS/S3/GCS..) : S3

Running on Docker? (yes/no) : no

gtwuser avatar Sep 30 '22 20:09 gtwuser

I have also encountered this problem.

yihao-tcf avatar Oct 31 '22 08:10 yihao-tcf