hudi icon indicating copy to clipboard operation
hudi copied to clipboard

org.apache.avro.AvroRuntimeException: Not a valid schema field: IncidentId,EntryTime,ModifiedTime

Open prat0082 opened this issue 1 year ago • 11 comments

Tips before filing an issue

  • Have you gone through our FAQs?

  • Join the mailing list to engage in conversations and get faster support at [email protected].

  • If you have triaged this as a bug, then file an issue directly.

Describe the problem you faced

Hudi Upsert fails with AvroRuntimeException: Not a valid schema field ,

I have completed the full load of the data using hudi bulk insert with same set of hudi recordkey , partition and precombine fields once the full load is completed , incremental load is set to run to update the data for past 7 days , but it fails with Schema Evolution issue

am using AWS Glue 4.0

'hoodie.table.name': hudi_table_name , 'hoodie.datasource.write.storage.type': 'COPY_ON_WRITE', 'hoodie.datasource.write.table.type': 'COPY_ON_WRITE', 'hoodie.datasource.write.operation': 'upsert', 'hoodie.upsert.shuffle.parallelism' : 200, 'hoodie.insert.shuffle.parallelism' : 200, 'hoodie.bloom.index.parallelism' : 320, 'hoodie.parquet.small.file.limit' : '20971520', 'hoodie.datasource.write.recordkey.field': recordkey, 'hoodie.datasource.write.precombine.field': precombinedfield, 'hoodie.datasource.write.partitionpath.field': partitionfield, 'hoodie.datasource.write.hive_style_partitioning': 'true', 'hoodie.datasource.write.table.name': hudi_table_name, 'path': s3_output_path, 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.database': hudi_database_name, 'hoodie.datasource.hive_sync.table': hudi_table_name, 'hoodie.datasource.hive_sync.partition_fields': partitionfield, 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor', 'hoodie.datasource.hive_sync.use_jdbc': 'false', 'hoodie.datasource.hive_sync.mode': 'hms', 'hoodie.meta.sync.client.tool.class':'org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool', 'hoodie.avro.schema.external.transformation' : 'true', 'hoodie.schema.on.read.enable' : 'true', 'hoodie.datasource.write.reconcile.schema' : 'true', 'hoodie.avro.schema.validate':'true', 'hoodie.datasource.write.schema.allow.auto.evolution.column.drop':'true', 'hoodie.datasource.hive_sync.support_timestamp': 'true', 'hoodie.write.lock.provider' : 'org.apache.hudi.hive.HiveMetastoreBasedLockProvider', 'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.ComplexKeyGenerator', 'hoodie.write.lock.hivemetastore.database' : hudi_database_name, 'hoodie.write.lock.hivemetastore.table' : hudi_table_name, 'hoodie.index.type': 'GLOBAL_BLOOM', 'hoodie.bloom.index.update.partition.path': 'true', 'hoodie.bloom.index.filter.type':'DYNAMIC_V0', 'hoodie.cleaner.commits.retained': 1, 'hoodie.clean.automatic': 'true', 'hoodie.cleaner.parallelism': 100

A clear and concise description of the problem.

To Reproduce

Steps to reproduce the behavior:

1.insert records using bulk insert 2.upsert the incremental records 80 % update and 20% insert 3. 4.

Expected behavior

A clear and concise description of what you expected to happen. Data should be updated and inserted onto the hudi table

Environment Description

  • Hudi version : 0.12.1

  • Spark version :3.3

  • Hive version :

  • Hadoop version :

  • Storage (HDFS/S3/GCS..) :S3

  • Running on Docker? (yes/no) :no

Additional context

Add any other context about the problem here.

Stacktrace

py4j.protocol.Py4JJavaError: An error occurred while calling o146.save. : org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit time 20240808100109898 at org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:64) at org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor.execute(SparkUpsertCommitActionExecutor.java:45) at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:113) at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:97) at org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:158) at org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:206) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:331) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:154) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73) at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:103) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224) at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:114) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:139) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:139) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:138) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:100) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:96) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:615) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:177) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:615) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:591) at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:96) at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:83) at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:81) at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:124) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390) at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 25 in stage 6.0 failed 4 times, most recent failure: Lost task 25.3 in stage 6.0 (TID 145) (10.63.111.23 executor 2): org.apache.avro.AvroRuntimeException: Not a valid schema field: IncidentId,EntryTime,ModifiedTime at org.apache.avro.generic.GenericData$Record.get(GenericData.java:268) at org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldVal(HoodieAvroUtils.java:499) at org.apache.hudi.HoodieSparkSqlWriter$.$anonfun$write$11(HoodieSparkSqlWriter.scala:297) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:199) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:138) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750)

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2863) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2799) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2798) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2798) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1239) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1239) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1239) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3051) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2993) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1009) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2229) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2250) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2269) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2294) at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:406) at org.apache.spark.rdd.RDD.collect(RDD.scala:1020) at org.apache.spark.rdd.PairRDDFunctions.$anonfun$countByKey$1(PairRDDFunctions.scala:367) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:406) at org.apache.spark.rdd.PairRDDFunctions.countByKey(PairRDDFunctions.scala:367) at org.apache.spark.api.java.JavaPairRDD.countByKey(JavaPairRDD.scala:314) at org.apache.hudi.data.HoodieJavaPairRDD.countByKey(HoodieJavaPairRDD.java:105) at org.apache.hudi.index.bloom.HoodieBloomIndex.lookupIndex(HoodieBloomIndex.java:119) at org.apache.hudi.index.bloom.HoodieBloomIndex.tagLocation(HoodieBloomIndex.java:89) at org.apache.hudi.table.action.commit.HoodieWriteHelper.tag(HoodieWriteHelper.java:50) at org.apache.hudi.table.action.commit.HoodieWriteHelper.tag(HoodieWriteHelper.java:33) at org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:53) ... 54 more Caused by: org.apache.avro.AvroRuntimeException: Not a valid schema field: IncidentId,EntryTime,ModifiedTime at org.apache.avro.generic.GenericData$Record.get(GenericData.java:268) at org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldVal(HoodieAvroUtils.java:499) at org.apache.hudi.HoodieSparkSqlWriter$.$anonfun$write$11(HoodieSparkSqlWriter.scala:297) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:199) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:138) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more

prat0082 avatar Aug 08 '24 19:08 prat0082

You may got some schema mismatch:

public Object get(String key) {
      Field field = schema.getField(key);
      if (field == null) {
        throw new AvroRuntimeException("Not a valid schema field: " + key);
      }
      return values[field.pos()];
    }

danny0405 avatar Aug 09 '24 01:08 danny0405

These are recordkey fields and types are IncidentId,EntryTime,ModifiedTime int and timestamp , am not doing any transformation

Do i need to set any specific hudi options for timestamp recordkey?

prat0082 avatar Aug 09 '24 11:08 prat0082

I see the below in the log as avro schema for timestamp fields which i use as recordkeys , type as long and logicalType as timestamp-micros

{ "name" : "EntryTime", "type" : [ "null", { "type" : "long", "logicalType" : "timestamp-micros" } ], { "name" : "ModifiedTime", "type" : [ "null", { "type" : "long", "logicalType" : "timestamp-micros" } ], "default" : null

prat0082 avatar Aug 09 '24 12:08 prat0082

But it looks like the three fields are concatenated as one as of to Avro schema, which is the reason it can not be found.

danny0405 avatar Aug 10 '24 01:08 danny0405

i need these fields as recordkey for indexing and unique identification for the records

prat0082 avatar Aug 11 '24 22:08 prat0082

Then you just need to define the record keys concatented with comma:

'hoodie.datasource.write.recordkey.field': col_a,col_b,col_c

danny0405 avatar Aug 12 '24 02:08 danny0405

it's defined in code separated with comma recordkey = 'IncidentId,EntryTime,ModifiedTime'

prat0082 avatar Aug 12 '24 15:08 prat0082

I saw this in the options: 'hoodie.datasource.write.recordkey.field': recordkey,

danny0405 avatar Aug 12 '24 23:08 danny0405

Changed the recordkey field to have timestamp column , still throws the same error

Caused by: org.apache.avro.AvroRuntimeException: Not a valid schema field: entryrefintime at org.apache.avro.generic.GenericData$Record.get(GenericData.java:268) at org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldVal(HoodieAvroUtils.java:499) at org.apache.hudi.HoodieSparkSqlWriter$.$anonfun$write$11(HoodieSparkSqlWriter.scala:297) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:199) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:138) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more

prat0082 avatar Aug 13 '24 04:08 prat0082

@prat0082 Can you let us know what value you are giving for recordKey. image

ad1happy2go avatar Aug 14 '24 12:08 ad1happy2go

RecordKey - 'incidentid,entrytime,modifiedtime'

incidentid - int , entrytime - timestamp modifiedtime - timestamp

prat0082 avatar Aug 18 '24 17:08 prat0082