hudi
hudi copied to clipboard
[SUPPORT] Hudi table created with dataframe API becomes unwritable to INSERT queries due to config conflict
Tips before filing an issue
-
Have you gone through our FAQs?
-
Join the mailing list to engage in conversations and get faster support at [email protected].
-
If you have triaged this as a bug, then file an issue directly.
Describe the problem you faced
INSERT INTO queries would fail on table created with dataframe API due to config conflict. The exception below shows conflict on precombine field but I believe this can happen on any datasource config.
To Reproduce
Steps to reproduce the behavior:
- Open a scala shell with Hudi spark bundle and create a table with dataframe API, sample script:
import org.apache.hudi.DataSourceWriteOptions
import org.apache.spark.sql.SaveMode
val df1 = Seq(
("100", "2015-01-01", "event_name_900", "2015-01-01T13:51:39.340396Z", "type1"),
("101", "2015-01-01", "event_name_546", "2015-01-01T12:14:58.597216Z", "type2"),
("102", "2015-01-01", "event_name_345", "2015-01-01T13:51:40.417052Z", "type3"),
("103", "2015-01-01", "event_name_234", "2015-01-01T13:51:40.519832Z", "type4"),
("104", "2015-01-01", "event_name_123", "2015-01-01T12:15:00.512679Z", "type1"),
("105", "2015-01-01", "event_name_678", "2015-01-01T13:51:42.248818Z", "type2"),
("106", "2015-01-01", "event_name_890", "2015-01-01T13:51:44.735360Z", "type3"),
("107", "2015-01-01", "event_name_944", "2015-01-01T13:51:45.019544Z", "type4"),
("108", "2015-01-01", "event_name_456", "2015-01-01T13:51:45.208007Z", "type1"),
("109", "2015-01-01", "event_name_567", "2015-01-01T13:51:45.369689Z", "type2"),
("110", "2015-01-01", "event_name_789", "2015-01-01T12:15:05.664947Z", "type3"),
("111", "2015-01-01", "event_name_322", "2015-01-01T13:51:47.388239Z", "type4")
).toDF("event_id", "event_date", "event_name", "event_ts", "event_type")
val r = scala.util.Random
val num = r.nextInt(99999)
var tableName = "tableName" + num
var tablePath = "table path"
df1.write.format("hudi")
.option("hoodie.metadata.enable", "true")
.option("hoodie.table.name", tableName)
.option("hoodie.datasource.write.operation", "upsert")
.option("hoodie.datasource.write.table.type", "COPY_ON_WRITE")
.option("hoodie.datasource.write.recordkey.field", "event_id,event_date")
.option("hoodie.datasource.write.partitionpath.field", "event_type")
.option("hoodie.datasource.write.precombine.field", "event_ts")
.option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.ComplexKeyGenerator")
// .option("hoodie.datasource.write.hive_style_partitioning", "true")
.option("hoodie.datasource.hive_sync.enable", "true")
.option("hoodie.datasource.meta.sync.enable", "true")
.option("hoodie.meta.sync.client.tool.class", "org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool")
.option("hoodie.datasource.hive_sync.mode", "hms")
.option("hoodie.datasource.hive_sync.database", "default")
.option("hoodie.datasource.hive_sync.table", tableName)
.option("hoodie.datasource.hive_sync.partition_fields", "event_type")
.option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.MultiPartKeysValueExtractor")
.mode(SaveMode.Append)
.save(tablePath)
- Run
INSERT INTOwith spark-sql
INSERT INTO table_name (event_id, event_date, event_name, event_ts, event_type) VALUES ('131', '2015-01-01', 'event_name_567', '2015-01-01T13:51:45.369689Z', 'type2')
Expected behavior
INSERT INTO should work on tables created with dataframe api
Environment Description EMR-7.2
-
Hudi version : 0.14.1 (Hudi 0.15 or Spark 3.4 should have the same problem)
-
Spark version : 3.5.0
-
Hive version :
-
Hadoop version :
-
Storage (HDFS/S3/GCS..) : S3
-
Running on Docker? (yes/no) :
Additional context This issue doesn't happen if table was created with SQL I suspect this is related to hive sync. I used Glue as the catalog and I don't see precombine config synced to Glue when creating the table with dataframe. And the precombine field cannot be inferred here correctly becasue catalog doesn't have the precombine info. If the table was created with SQL then the precombine field would be synced to Glue and referred correctly when inserting data.
Stacktrace
org.apache.hudi.exception.HoodieException: Config conflict(key current value existing value):
PreCombineKey: event_ts
at org.apache.hudi.HoodieWriterUtils$.validateTableConfig(HoodieWriterUtils.scala:212)
at org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:249)
at org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:204)
at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:121)
at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand$.run(InsertIntoHoodieTableCommand.scala:108)
at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand.run(InsertIntoHoodieTableCommand.scala:61)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:126)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:108)
https://github.com/apache/hudi/blob/35c00daaf871a6c1b87d6a440832d60f9b26ee14/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala#L192-L195
This is a legacy code.
@xicm I agree and I think we should remove precombine field from datasource conf if we don't want to allow users to change precombine fields of their tables
@CTTY what is the case that we don't want to allow users to change the precombine fields? I use the config to set the field I'd like to use like "_etl_cluster_ts".
If it is removed, how can I set my custom precombine fields?
So, this issue is about "INSERT INTO should work on tables created with dataframe api"?
Table created by dataframe api will init hoodie.table.precombine.field. When we use insert into in spark sql, we don't set hoodie.table.precombine.field or hoodie.datasource.write.precombine.field, but buildHoodieInsertConfig set precombine to "" if precombine filed is not set.
When we do validation precombine field in table properties is its real value, the value in params is "", and throw an exception.
So we either remove the code in ProvidesHoodieConfig or let the defauilt value to be null not empty string.
I posted a quick patch to fix the issue, but ideally I think Hudi should remove all write configs that are not allowed to change. Or we can point those write configs to the equivalent table configs. So we don't even need the validation logic here: https://github.com/apache/hudi/blob/64f546b8f0cae70793a6150170a649bad8e0e146/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala#L165
Is there any workaround to this that doesn't require a patch? This unfortunately makes Hudi unusable with dbt-spark (for us) because every INSERT INTO statement breaks on this validation check on the precombine field.
Edit: I think this affects all tables that are created with a precombine field, not just through DataFrame API - eg. https://github.com/apache/hudi/issues/10626 has an example where precombine is set in dbt-spark
@mzheng-plaid have you tried spark.sql("set hoodie.datasource.write.precombine.field=<precombine>") in your session?
@CTTY thanks for the suggestion, I'll try that tomorrow
Yes that worked, thanks @CTTY