hudi icon indicating copy to clipboard operation
hudi copied to clipboard

[SUPPORT] Config conflict with Deltastreamer CustomKeyGenerator - PartitionPath

Open sydneyhoran opened this issue 1 year ago • 11 comments

Describe the problem you faced

Using Deltastreamer to ingest Avro messages from Kafka topics (PostgresDebeziumAvroPayload) into S3 Hudi tables. When using CustomKeyGenerator for multiple RecordKeys and timebased PartitionPath, Deltastreamer throws an error on first run if the partitionPath doesn't include the datatype.

Configs:

hoodie.datasource.write.recordkey.field=id,_changed_at,_change_type
hoodie.datasource.write.partitionpath.field=inserted_at
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.CustomKeyGenerator
hoodie.deltastreamer.keygen.timebased.timestamp.type=EPOCHMILLISECONDS
hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd
hoodie.datasource.write.precombine.field=updated_at

Error:

Caused by: org.apache.hudi.exception.HoodieKeyException: Unable to find field names for partition path in proper format

We found that the first job must be run with the datatype (i.e. hoodie.datasource.write.partitionpath.field=inserted_at:TIMESTAMP), and Hudi automatically sets hoodie.datasource.write.partitionpath.field with the extracted partition path as inserted_at (without the :TIMESTAMP). However, on the following run, when we pass inserted_at:TIMESTAMP, the job fails due to config conflict.

Exception in thread "main" org.apache.hudi.exception.HoodieException: Config conflict(key current value existing value):
PartitionPath: inserted_at:TIMESTAMP inserted_at

The :TIMESTAMP must be removed for subsequent runs, but appears to be required for the first run. The expected behavior is to allow configs be the same for first run vs all future runs.

Is there another config we can set that would be compatible for first run as well as future runs?

To Reproduce

Steps to reproduce the behavior:

  1. Run Deltastreamer for Debezium Avro payload messages from Kafka topic
  2. Set configs to use CustomKeyGenerator, multiple record keys, EPOCHMILLISECONDS formatted partition path (don't set :TIMESTAMP) for the first run
  3. HoodieKeyException is produced Unable to find field names for partition path in proper format
  4. Change config to hoodie.datasource.write.partitionpath.field=inserted_at:TIMESTAMP
  5. Job runs as expected
  6. Run the job again with the same config ^ (:TIMESTAMP)
  7. HoodieException Config conflict is produced
  8. Remove :TIMESTAMP from partitionpath config
  9. Job runs as expected

Expected behavior

Deltastreamer should recognize the PartitionPath column on the first run, and configs should be the same for all future runs. If datatype needs to be specified, it should be put into the .hoodie/hoodie.properties file as inserted_at:TIMESTAMP so there is no config conflict later.

Environment Description

  • Hudi version : 0.14.0-SNAPSHOT

  • Spark version : 3.1

  • Hive version : N/A

  • Hadoop version : N/A

  • Storage (HDFS/S3/GCS..) : S3

  • Running on Docker? (yes/no) : both

Additional context

Add any other context about the problem here.

Stacktrace

For first run without specifying :TIMESTAMP:

Caused by: org.apache.hudi.exception.HoodieKeyException: Unable to find field names for partition path in proper format
	at org.apache.hudi.keygen.CustomAvroKeyGenerator.getPartitionPath(CustomAvroKeyGenerator.java:78)
	at org.apache.hudi.keygen.CustomKeyGenerator.getPartitionPath(CustomKeyGenerator.java:83)
	at org.apache.hudi.keygen.BaseKeyGenerator.getKey(BaseKeyGenerator.java:69)
	at org.apache.hudi.utilities.deltastreamer.DeltaSync.lambda$fetchFromSource$ddc224fb$1(DeltaSync.java:618)
	at org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1070)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$SliceIterator.next(Iterator.scala:273)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at scala.collection.AbstractIterator.to(Iterator.scala:1431)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1431)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at scala.collection.AbstractIterator.toArray(Iterator.scala:1431)
	at org.apache.spark.rdd.RDD.$anonfun$take$2(RDD.scala:1449)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2244)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:505)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:508)
	... 3 more

For subsequent runs keeping :TIMESTAMP in the config:

KeyGenerator:	org.apache.hudi.keygen.CustomKeyGenerator	org.apache.hudi.keygen.TimestampBasedKeyGenerator
PartitionPath:	inserted_at:TIMESTAMP	inserted_at
	at org.apache.hudi.HoodieWriterUtils$.validateTableConfig(HoodieWriterUtils.scala:181)
	at org.apache.hudi.HoodieWriterUtils$.validateTableConfig(HoodieWriterUtils.scala:131)
	at org.apache.hudi.HoodieWriterUtils.validateTableConfig(HoodieWriterUtils.scala)
	at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.<init>(HoodieDeltaStreamer.java:647)
	at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.<init>(HoodieDeltaStreamer.java:156)
	at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.<init>(HoodieDeltaStreamer.java:129)
	at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:559)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

sydneyhoran avatar Apr 03 '23 15:04 sydneyhoran

It seems to be related to splitting the string in SparkKeyGenUtils.scala#L47 especially since it only happens with customkeygen and not timestamp keygen.

And when DeltaSync.java calls getPartitionFields to write to hoodie.properties (line 366/373), it takes out the :TIMESTAMP part and so then future runs get a config conflict... Is this expected behavior/any way to not have to pass the datatype?

Thanks in advance!

sydneyhoran avatar Apr 04 '23 15:04 sydneyhoran

Oh, good (bad?) timing. I just ran into this today.

Any workarounds?

@berniedurfee-renaissance this was what I changed to make it work in my fork https://github.com/sydneyhoran/hudi/commit/b1692c6ba3901d40b0523fe5226b5c5bff51ac7f, but I'm sure it's not the most ideal path forward :)

sydneyhoran avatar Apr 05 '23 01:04 sydneyhoran

I also faced this yesterday. In order to overcome this without modifying the Hudi source code, I used the SqlQueryBasedTransformer and the ComplexKeyGenerator to extract the year,month and day values from the inserted_at column, like this:

--hoodie-conf hoodie.datasource.write.recordkey.field=id \
--hoodie-conf hoodie.datasource.write.hive_style_partitioning=true \
--hoodie-conf hoodie.datasource.write.partitionpath.field=year,month,day \
--hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator \
--transformer-class org.apache.hudi.utilities.transform.SqlQueryBasedTransformer \
--hoodie-conf hoodie.deltastreamer.transformer.sql="SELECT * , extract(year from inserted_at) as year, extract(month from inserted_at) as month, extract(day from inserted_at) as day FROM <SRC> a;"

Hudi version: 0.13.0, Storage: S3, Deployment: Kubernetes, timestamp.type: SCALAR

nikoshet avatar Apr 05 '23 09:04 nikoshet

Able to reproduce the error with hudi master in exactly similar way provided by @sydneyhoran .

Details to Reproduce - https://gist.github.com/ad1happy2go/49b81f015c1a2964fee489214658cf44

JIRA created to fix the same - https://issues.apache.org/jira/browse/HUDI-6282

ad1happy2go avatar May 29 '23 14:05 ad1happy2go

Hi Team. I also faced this now.The relevant configurations are as follows:

"hoodie.datasource.write.partitionpath.field" = "region:SIMPLE" "hoodie.datasource.write.keygenerator.class" = "org.apache.hudi.keygen.CustomKeyGenerator"

When we used 0.14.0 for the first write, there was no problem. As long as it is not the first time writing, which means there is already data in the output path, an error will be reported

Exception in thread "main" org.apache.hudi.exception.HoodieException: Config conflict(key current value existing value): PartitionPath: region:SIMPLE region at org.apache.hudi.HoodieWriterUtils$.validateTableConfig(HoodieWriterUtils.scala:211) at org.apache.hudi.HoodieSparkSqlWriter$.writeInternal(HoodieSparkSqlWriter.scala:177) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:132) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:150)

Can you provide an RCA for this issue? The same configuration works fine in 0.11.1. It seems that someone has raised a similar question a long time ago.

Go through the source code and found that: In 0.11.1. This is no check for partition path 1111

But in 0.14.0, there is a check on partitionPath here 222

In my understanding, if you use a CustomiKeyGenerator, then simply checking with "!=" should not be done here, as this is extremely imprecise.

BWT, Shouldn't regression testing be done before release? Thank you team

CaesarWangX avatar Feb 28 '24 05:02 CaesarWangX

CC: @nsivabalan @xushiyan @codope

CaesarWangX avatar Feb 28 '24 05:02 CaesarWangX

@CaesarWangX There are few table property validations which were added after 0.12. I tried a dirty fix here to by pass the validation - https://github.com/apache/hudi/pull/8869 Will try to follow up if we can merge or write a better one here.

ad1happy2go avatar Mar 06 '24 13:03 ad1happy2go

Thank you @ad1happy2go

CaesarWangX avatar Mar 07 '24 05:03 CaesarWangX

Hello all. Is it possible that this same behavior is not limited to the deltastreamer? I have been able to reproduce it by writing to a Hudi table from a Spark context. I can provide more details if this can be of interest

andipiet-te avatar Apr 02 '24 14:04 andipiet-te

@andipiet-te Can you try this PR - https://github.com/apache/hudi/pull/8869 . Do you think this PR will be able to get pass through validation stage and will work in your use case.

ad1happy2go avatar Apr 11 '24 16:04 ad1happy2go

@ad1happy2go I think the issue is fixed by https://github.com/apache/hudi/pull/10615 in Hudi 0.15.0

mzheng-plaid avatar Jul 15 '24 21:07 mzheng-plaid

yes. Closing this. Thanks.

ad1happy2go avatar Jul 18 '24 15:07 ad1happy2go