hudi
hudi copied to clipboard
[SUPPORT] Spark readStream fails with [COLUMN_ALREADY_EXISTS] for streaming tables created with "hoodie.schema.on.read.enable" & "hoodie.datasource.write.reconcile.schema" enabled
Describe the problem you faced
Streaming in Spark from a Hudi table fails with the error below when a writeStream
process has created / written to the table with the schema evolution settings hoodie.schema.on.read.enable
& hoodie.datasource.write.reconcile.schema
on. I have not been able to upsert a source schema containing either more columns and/or fewer columns than the target schema without this two settings enabled.
org.apache.spark.sql.AnalysisException: [COLUMN_ALREADY_EXISTS] The column _hoodie_commit_seqno already exists. Consider to choose another name or rename the existing column.
To Reproduce
val sparkConf: SparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName("testAppName")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
.set("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar")
.set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog")
val spark = SparkSession.builder.config(sparkConf).getOrCreate()
import spark.implicits._
implicit val sqlContext: SQLContext = spark.sqlContext
val hudiOptions: Map[String, String] = Map(
"hoodie.table.name" -> "test_table",
"hoodie.datasource.write.storage.type" -> "COPY_ON_WRITE",
"hoodie.datasource.write.operation" -> "upsert",
"hoodie.datasource.write.hive_style_partitioning" -> "true",
"hoodie.datasource.write.recordkey.field" -> "identifier",
"hoodie.datasource.write.precombine.field" -> "date",
"hoodie.datasource.insert.dup.policy" -> "none",
"hoodie.avro.schema.externalTransformation" -> "true",
"hoodie.schema.on.read.enable" -> "true",
"hoodie.datasource.write.reconcile.schema" -> "true")
val inMemoryRecords: List[Contract] =
List(Contract("001", 1, "test1", 100), Contract("002", 2, "test2", 100), Contract("003", 3, "test3", 100))
val contractsInMemory: MemoryStream[Contract] = MemoryStream[Contract]
contractsInMemory.addData(inMemoryRecords)
contractsInMemory
.toDF()
.writeStream
.format("hudi")
.trigger(Trigger.AvailableNow())
.queryName("streamingQueryName")
.option("checkpointLocation", "/tmp/checkpoint")
.options(hudiOptions)
.outputMode(OutputMode.Append())
.start("/tmp/data")
.processAllAvailable()
spark.readStream
.format("hudi")
.load("/tmp/data")
.writeStream
.format("memory")
.queryName("queryName")
.outputMode("append")
.start()
.processAllAvailable()
Environment Description
-
OS: Mac OS X
-
Hudi version: 0.14.0
-
Spark version: 3.4.1
-
Storage: S3 (LocalStack)
-
Running on Docker?: No
Additional context
This works fine with either hoodie.schema.on.read.enable
or hoodie.datasource.write.reconcile.schema
disabled.
Stacktrace
org.apache.spark.sql.AnalysisException: [COLUMN_ALREADY_EXISTS] The column `_hoodie_commit_seqno` already exists. Consider to choose another name or rename the existing column.
at org.apache.spark.sql.errors.QueryCompilationErrors$.columnAlreadyExistsError(QueryCompilationErrors.scala:2300)
at org.apache.spark.sql.util.SchemaUtils$.checkColumnNameDuplication(SchemaUtils.scala:113)
at org.apache.spark.sql.util.SchemaUtils$.checkSchemaColumnNameDuplication(SchemaUtils.scala:55)
at org.apache.spark.sql.util.SchemaUtils$.checkSchemaColumnNameDuplication(SchemaUtils.scala:71)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:427)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
at org.apache.hudi.IncrementalRelation.buildScan(IncrementalRelation.scala:275)
at org.apache.spark.sql.hudi.streaming.HoodieStreamSource.getBatch(HoodieStreamSource.scala:171)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$3(MicroBatchExecution.scala:586)
at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:293)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:27)
at scala.collection.TraversableLike.flatMap(TraversableLike.scala:293)
at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:290)
at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:27)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$2(MicroBatchExecution.scala:582)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:582)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:284)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:247)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:237)
at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:306)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:284)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:207)
[stream execution thread for test_hudi [id = 443bb452-13e7-4d43-916d-bac5fd5d1f2c, runId = dbf43a65-825f-45ca-bae0-d7109ee9d066]] INFO org.apache.spark.sql.execution.streaming.MicroBatchExecution - Async log purge executor pool for query test_hudi [id = 443bb452-13e7-4d43-916d-bac5fd5d1f2c, runId = dbf43a65-825f-45ca-bae0-d7109ee9d066] has been shutdown
[ScalaTest-run-running-HudiSourceIT] INFO org.apache.spark.SparkContext - SparkContext is stopping with exitCode 0.
[dispatcher-event-loop-4] INFO org.apache.spark.MapOutputTrackerMasterEndpoint - MapOutputTrackerMasterEndpoint stopped!
[ScalaTest-run-running-HudiSourceIT] INFO org.apache.spark.storage.memory.MemoryStore - MemoryStore cleared
[ScalaTest-run-running-HudiSourceIT] INFO org.apache.spark.storage.BlockManager - BlockManager stopped
[ScalaTest-run-running-HudiSourceIT] INFO org.apache.spark.storage.BlockManagerMaster - BlockManagerMaster stopped
[dispatcher-event-loop-2] INFO org.apache.spark.scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint - OutputCommitCoordinator stopped!
[ScalaTest-run-running-HudiSourceIT] INFO org.apache.spark.SparkContext - Successfully stopped SparkContext
[STREAM_FAILED] Query [id = 443bb452-13e7-4d43-916d-bac5fd5d1f2c, runId = dbf43a65-825f-45ca-bae0-d7109ee9d066] terminated with exception: [COLUMN_ALREADY_EXISTS] The column `_hoodie_commit_seqno` already exists. Consider to choose another name or rename the existing column.
org.apache.spark.sql.streaming.StreamingQueryException: [COLUMN_ALREADY_EXISTS] The column `_hoodie_commit_seqno` already exists. Consider to choose another name or rename the existing column.
=== Streaming Query ===
Identifier: test_hudi [id = 443bb452-13e7-4d43-916d-bac5fd5d1f2c, runId = dbf43a65-825f-45ca-bae0-d7109ee9d066]
Current Committed Offsets: {}
Current Available Offsets: {org.apache.spark.sql.hudi.streaming.HoodieStreamSource@1ac520b5: {"commitTime":"20240118170827012"}}
Current State: ACTIVE
Thread State: RUNNABLE
Logical Plan:
WriteToMicroBatchDataSource MemorySink, 443bb452-13e7-4d43-916d-bac5fd5d1f2c, [queryName=test_hudi], Append
+- StreamingExecutionRelation org.apache.spark.sql.hudi.streaming.HoodieStreamSource@1ac520b5, [_hoodie_commit_time#142, _hoodie_commit_seqno#143, _hoodie_record_key#144, _hoodie_partition_path#145, _hoodie_file_name#146, identifier#147, name#148, quantity#149, status#150, agent#151, metadata#152, contacts#153, date#154]
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:333)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:207)
Caused by: org.apache.spark.sql.AnalysisException: [COLUMN_ALREADY_EXISTS] The column `_hoodie_commit_seqno` already exists. Consider to choose another name or rename the existing column.
at org.apache.spark.sql.errors.QueryCompilationErrors$.columnAlreadyExistsError(QueryCompilationErrors.scala:2300)
at org.apache.spark.sql.util.SchemaUtils$.checkColumnNameDuplication(SchemaUtils.scala:113)
at org.apache.spark.sql.util.SchemaUtils$.checkSchemaColumnNameDuplication(SchemaUtils.scala:55)
at org.apache.spark.sql.util.SchemaUtils$.checkSchemaColumnNameDuplication(SchemaUtils.scala:71)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:427)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
at org.apache.hudi.IncrementalRelation.buildScan(IncrementalRelation.scala:275)
at org.apache.spark.sql.hudi.streaming.HoodieStreamSource.getBatch(HoodieStreamSource.scala:171)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$3(MicroBatchExecution.scala:586)
at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:293)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:27)
at scala.collection.TraversableLike.flatMap(TraversableLike.scala:293)
at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:290)
at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:27)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$2(MicroBatchExecution.scala:582)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:582)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:284)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:247)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:237)
at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:306)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:284)
... 1 more
@imonteroq You are correct. I was able to reproduce this. Here is full reproducible example - https://gist.github.com/ad1happy2go/db5813b8bd8d5f7142c2f9b0b2f29922
Created Tracking JIRA to fix the same - https://issues.apache.org/jira/browse/HUDI-7319
Thank you @ad1happy2go. One question while we are at it, will Hudi ever make use of Spark's checkpointing to carry out managed incremental ingestion? So far it seems from the documentation that a commit timestamp should be provided.
Sorry can you elaborate on that.
Yes, spark streaming should use the checkpointing in order to resume the stream. Please clarify in case i am missing anything here.
Sorry can you elaborate on that.
Yes, spark streaming should use the checkpointing in order to resume the stream. Please clarify in case i am missing anything here.
Apologies, my bad, I omitted part of my question. I meant incremental ingestion from another Hudi table using Spark's Structure Streaming. I've tested this functionality and it will always load the full table unless I pass in a commit timestamp. I can raise another issue to make my point better.
@imonteroq Ideally it should use the checkpoint details although not very sure, one reason may be cleaner configuration. Are you setting below config.
https://hudi.apache.org/docs/configurations/#hoodiedatasourcereadincrfallbackfulltablescanenable
@imonteroq Any updates on this.