hudi
hudi copied to clipboard
[SUPPORT] Config conflict with Deltastreamer CustomKeyGenerator - PartitionPath
Describe the problem you faced
Using Deltastreamer to ingest Avro messages from Kafka topics (PostgresDebeziumAvroPayload) into S3 Hudi tables. When using CustomKeyGenerator for multiple RecordKeys and timebased PartitionPath, Deltastreamer throws an error on first run if the partitionPath doesn't include the datatype.
Configs:
hoodie.datasource.write.recordkey.field=id,_changed_at,_change_type
hoodie.datasource.write.partitionpath.field=inserted_at
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.CustomKeyGenerator
hoodie.deltastreamer.keygen.timebased.timestamp.type=EPOCHMILLISECONDS
hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd
hoodie.datasource.write.precombine.field=updated_at
Error:
Caused by: org.apache.hudi.exception.HoodieKeyException: Unable to find field names for partition path in proper format
We found that the first job must be run with the datatype (i.e. hoodie.datasource.write.partitionpath.field=inserted_at:TIMESTAMP
), and Hudi automatically sets hoodie.datasource.write.partitionpath.field
with the extracted partition path as inserted_at
(without the :TIMESTAMP). However, on the following run, when we pass inserted_at:TIMESTAMP
, the job fails due to config conflict.
Exception in thread "main" org.apache.hudi.exception.HoodieException: Config conflict(key current value existing value):
PartitionPath: inserted_at:TIMESTAMP inserted_at
The :TIMESTAMP must be removed for subsequent runs, but appears to be required for the first run. The expected behavior is to allow configs be the same for first run vs all future runs.
Is there another config we can set that would be compatible for first run as well as future runs?
To Reproduce
Steps to reproduce the behavior:
- Run Deltastreamer for Debezium Avro payload messages from Kafka topic
- Set configs to use CustomKeyGenerator, multiple record keys, EPOCHMILLISECONDS formatted partition path (don't set :TIMESTAMP) for the first run
- HoodieKeyException is produced
Unable to find field names for partition path in proper format
- Change config to
hoodie.datasource.write.partitionpath.field=inserted_at:TIMESTAMP
- Job runs as expected
- Run the job again with the same config ^ (:TIMESTAMP)
- HoodieException
Config conflict
is produced - Remove :TIMESTAMP from partitionpath config
- Job runs as expected
Expected behavior
Deltastreamer should recognize the PartitionPath column on the first run, and configs should be the same for all future runs. If datatype needs to be specified, it should be put into the .hoodie/hoodie.properties
file as inserted_at:TIMESTAMP so there is no config conflict later.
Environment Description
-
Hudi version : 0.14.0-SNAPSHOT
-
Spark version : 3.1
-
Hive version : N/A
-
Hadoop version : N/A
-
Storage (HDFS/S3/GCS..) : S3
-
Running on Docker? (yes/no) : both
Additional context
Add any other context about the problem here.
Stacktrace
For first run without specifying :TIMESTAMP:
Caused by: org.apache.hudi.exception.HoodieKeyException: Unable to find field names for partition path in proper format
at org.apache.hudi.keygen.CustomAvroKeyGenerator.getPartitionPath(CustomAvroKeyGenerator.java:78)
at org.apache.hudi.keygen.CustomKeyGenerator.getPartitionPath(CustomKeyGenerator.java:83)
at org.apache.hudi.keygen.BaseKeyGenerator.getKey(BaseKeyGenerator.java:69)
at org.apache.hudi.utilities.deltastreamer.DeltaSync.lambda$fetchFromSource$ddc224fb$1(DeltaSync.java:618)
at org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1070)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at scala.collection.Iterator$SliceIterator.next(Iterator.scala:273)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
at scala.collection.AbstractIterator.to(Iterator.scala:1431)
at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1431)
at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1431)
at org.apache.spark.rdd.RDD.$anonfun$take$2(RDD.scala:1449)
at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2244)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:505)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:508)
... 3 more
For subsequent runs keeping :TIMESTAMP in the config:
KeyGenerator: org.apache.hudi.keygen.CustomKeyGenerator org.apache.hudi.keygen.TimestampBasedKeyGenerator
PartitionPath: inserted_at:TIMESTAMP inserted_at
at org.apache.hudi.HoodieWriterUtils$.validateTableConfig(HoodieWriterUtils.scala:181)
at org.apache.hudi.HoodieWriterUtils$.validateTableConfig(HoodieWriterUtils.scala:131)
at org.apache.hudi.HoodieWriterUtils.validateTableConfig(HoodieWriterUtils.scala)
at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.<init>(HoodieDeltaStreamer.java:647)
at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.<init>(HoodieDeltaStreamer.java:156)
at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.<init>(HoodieDeltaStreamer.java:129)
at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:559)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
It seems to be related to splitting the string in SparkKeyGenUtils.scala#L47 especially since it only happens with customkeygen and not timestamp keygen.
And when DeltaSync.java calls getPartitionFields to write to hoodie.properties (line 366/373), it takes out the :TIMESTAMP part and so then future runs get a config conflict... Is this expected behavior/any way to not have to pass the datatype?
Thanks in advance!
Oh, good (bad?) timing. I just ran into this today.
Any workarounds?
@berniedurfee-renaissance this was what I changed to make it work in my fork https://github.com/sydneyhoran/hudi/commit/b1692c6ba3901d40b0523fe5226b5c5bff51ac7f, but I'm sure it's not the most ideal path forward :)
I also faced this yesterday. In order to overcome this without modifying the Hudi source code, I used the SqlQueryBasedTransformer
and the ComplexKeyGenerator
to extract the year
,month
and day
values from the inserted_at
column, like this:
--hoodie-conf hoodie.datasource.write.recordkey.field=id \
--hoodie-conf hoodie.datasource.write.hive_style_partitioning=true \
--hoodie-conf hoodie.datasource.write.partitionpath.field=year,month,day \
--hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator \
--transformer-class org.apache.hudi.utilities.transform.SqlQueryBasedTransformer \
--hoodie-conf hoodie.deltastreamer.transformer.sql="SELECT * , extract(year from inserted_at) as year, extract(month from inserted_at) as month, extract(day from inserted_at) as day FROM <SRC> a;"
Hudi version: 0.13.0, Storage: S3, Deployment: Kubernetes, timestamp.type: SCALAR
Able to reproduce the error with hudi master in exactly similar way provided by @sydneyhoran .
Details to Reproduce - https://gist.github.com/ad1happy2go/49b81f015c1a2964fee489214658cf44
JIRA created to fix the same - https://issues.apache.org/jira/browse/HUDI-6282
Hi Team. I also faced this now.The relevant configurations are as follows:
"hoodie.datasource.write.partitionpath.field" = "region:SIMPLE" "hoodie.datasource.write.keygenerator.class" = "org.apache.hudi.keygen.CustomKeyGenerator"
When we used 0.14.0 for the first write, there was no problem. As long as it is not the first time writing, which means there is already data in the output path, an error will be reported
Exception in thread "main" org.apache.hudi.exception.HoodieException: Config conflict(key current value existing value): PartitionPath: region:SIMPLE region at org.apache.hudi.HoodieWriterUtils$.validateTableConfig(HoodieWriterUtils.scala:211) at org.apache.hudi.HoodieSparkSqlWriter$.writeInternal(HoodieSparkSqlWriter.scala:177) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:132) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:150)
Can you provide an RCA for this issue? The same configuration works fine in 0.11.1. It seems that someone has raised a similar question a long time ago.
Go through the source code and found that:
In 0.11.1. This is no check for partition path
But in 0.14.0, there is a check on partitionPath here
In my understanding, if you use a CustomiKeyGenerator, then simply checking with "!=" should not be done here, as this is extremely imprecise.
BWT, Shouldn't regression testing be done before release? Thank you team
CC: @nsivabalan @xushiyan @codope
@CaesarWangX There are few table property validations which were added after 0.12. I tried a dirty fix here to by pass the validation - https://github.com/apache/hudi/pull/8869 Will try to follow up if we can merge or write a better one here.
Thank you @ad1happy2go
Hello all. Is it possible that this same behavior is not limited to the deltastreamer? I have been able to reproduce it by writing to a Hudi table from a Spark context. I can provide more details if this can be of interest
@andipiet-te Can you try this PR - https://github.com/apache/hudi/pull/8869 . Do you think this PR will be able to get pass through validation stage and will work in your use case.
@ad1happy2go I think the issue is fixed by https://github.com/apache/hudi/pull/10615 in Hudi 0.15.0
yes. Closing this. Thanks.