hudi
hudi copied to clipboard
[SUPPORT] java.lang.ClassCastException: class org.apache.spark.sql.catalyst.expressions.UnsafeRow cannot be cast to class org.apache.spark.sql.vectorized.ColumnarBatch
Tips before filing an issue
-
Have you gone through our FAQs?
-
Join the mailing list to engage in conversations and get faster support at [email protected].
-
If you have triaged this as a bug, then file an issue directly.
Describe the problem you faced
Upgrading Hudi version from 0.13.1 with metadata turned off, to 0.14.1 with metadata turned on.
First run went through fine and created the metadata table.
Second run I am facing the issue shown below.
To Reproduce
Steps to reproduce the behavior:
- Bootstrap COW table with Hudi 0.13.1 and metadata turned off.
- Run incremental ETL with Hudi 0.14.1 and metadata turned on. This step succeeds. Use org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload. Hudi table version successfully upgraded to 6.
- Run subsequent incremental with Hudi 0.14.1 with metadata turned on. This step fails. Use org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload.
Expected behavior
A partial update on the table, that succeeds consistently, not only once.
Environment Description
-
Hudi version : 0.14.1
-
Spark version : 3.3.2
-
Hive version : 3.1.3
-
Hadoop version : 3.3.6
-
Storage (HDFS/S3/GCS..) : GCS
-
Running on Docker? (yes/no) : Dataproc
Additional context
Add any other context about the problem here. Hudi configurations as follows
hoodie.parquet.small.file.limit -> 4194304
hoodie.bloom.index.parallelism -> 256
hoodie.parquet.max.file.size -> 33554432
hoodie.partition.metafile.use.base.format -> true
hoodie.datasource.write.keygenerator.consistent.logical.timestamp.enabled -> true
hoodie.parquet.block.size -> 16777216
hoodie.metadata.enable -> true
hoodie.datasource.write.drop.partition.columns -> true
hoodie.keep.max.commits -> 91
hoodie.upsert.shuffle.parallelism -> 256
hoodie.cleaner.commits.retained -> 77
hoodie.keep.min.commits -> 84
hoodie.global.bloom.index.parallelism -> 25
hoodie.datasource.write.precombine.field -> HUDI_PRECOMBINE_TS
hoodie.datasource.write.operation -> upsert
hoodie.datasource.write.recordkey.field -> ORDER_NUM,APPLN_VER_CD
hoodie.table.name -> OrderTable
hoodie.datasource.write.hive_style_partitioning -> true
hoodie.datasource.write.keygenerator.class -> org.apache.hudi.keygen.ComplexKeyGenerator
hoodie.datasource.write.partitionpath.field -> ORDER_CREATE_UTC_DT,APPLN_VER_C
Stacktrace
Add the stacktrace of the error.
The error stack trace.
24/04/26 16:21:51 ERROR TableLoaderClass:
org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:70)
org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor.execute(SparkUpsertCommitActionExecutor.java:44)
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:114)
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:103)
org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:142)
org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:224)
org.apache.hudi.HoodieSparkSqlWriterInternal.liftedTree1$1(HoodieSparkSqlWriter.scala:504)
org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:502)
org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:204)
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:121)
org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:150)
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
-- Application specific stack trace
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:739)
And prior warning found
24/04/26 16:21:49 WARN TaskSetManager: Lost task 186.0 in stage 1.0 (TID 226) (gif-publish-incremental-sfo-sw-swwj.c.wmt-bfdms-opddev.internal executor 45): java.lang.ClassCastException: class org.apache.spark.sql.catalyst.expressions.UnsafeRow cannot be cast to class org.apache.spark.sql.vectorized.ColumnarBatch (org.apache.spark.sql.catalyst.expressions.UnsafeRow and org.apache.spark.sql.vectorized.ColumnarBatch are in unnamed module of loader 'app')
at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.next(DataSourceScanExec.scala:579)
at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.next(DataSourceScanExec.scala:568)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.columnartorow_nextBatch_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
at scala.collection.Iterator.isEmpty(Iterator.scala:387)
at scala.collection.Iterator.isEmpty$(Iterator.scala:387)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.isEmpty(WholeStageCodegenExec.scala:758)
at org.apache.hudi.HoodieSparkUtils$.$anonfun$createRdd$2(HoodieSparkUtils.scala:108)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:855)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:855)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.sql.execution.SQLConfInjectingRDD.compute(SQLConfInjectingRDD.scala:58)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
It looks like a known issue reported in: https://github.com/apache/hudi/issues/9305
Hi @danny0405, yes it does look similar. However, the table was already running with Spark 3.3.2 and hudi 0.13.1 without errors. The only changes here were we upgraded the Hudi version to 0.14.1 and turned on metadata.
The casting also seems to be in the opposite direction and the first run with Hudi 0.14.1 did not have metadata and succeeded. Do you think the issue is related to how the metadata table is saved? In other words, is metadata not supported with Spark 3.3.2?
Thanks for the help!
Do you think the issue is related to how the metadata table is saved? In other words, is metadata not supported with Spark 3.3.2?
It is supported, can you share you config options related with metadata table?
Hi @danny0405 , we are using defaults only. All hudi configs specified are listed above. Is there something we should configure specifically?
I'm pretty sure it is a jar conflict, can you check the jar that involves the reported class?
@vicuna96 How many columns are there in your dataset? If its more than 100, did you tried setting spark.sql.codegen.maxFields
Hi @danny0405 , this seems to be in the spark-catalyst_2.12-3.3.2.jar package. but org.apache.spark.sql.catalyst.expressions.UnsafeRow does not extend org.apache.spark.sql.vectorized.ColumnarBatch. Is this expected in different versions?
Hi @ad1happy2go , I can give it a try but the table should have less than 100 columns and also this seems like a spark property rather than hudi property and the spark version has not changed. I will update once I get a chance to test it.
@vicuna96 Did you get a chance to test out.
I have a similar scene Hudi 0.12.3 version has been running for one year Last month, we test 0.15 and now 0.12.3 appear java.lang.ClassCastException: class org.apache.spark.sql.catalyst.expressions.UnsafeRow cannot be cast to class org.apache.spark.sql.vectorized.ColumnarBatch
We haven't changed any configuration (spark and hudi), just replaced hudi-spark3.3-bundle_ *. jar