hudi
hudi copied to clipboard
[SUPPORT] Hudi Deltastreamer CSV ingestion issue
Tips before filing an issue
Have you gone through our FAQs? Yes
Describe the problem you faced
Below is a sample chunk from a csv that is being ingested through Hudi Deltastreamer 0.9. +---+-----++---+-----++---+-----++---+-----++---+-----++---+-----++---+-----++---+-----++---+-----++---+-----+ 00037BAC6|00037BAF9|100|91.8886010561736|66.1986127558789|4|99.9443005280868|2022-06-21 00037BAC6|00077TAA2|32.3719958216579|67.2034832818589|0|0|49.7877395517584|2022-06-21 00037BAC6|00080QAF2|63.7767687043239|96.1682614803625|38.2990550305725|2|81.9725150923432|2022-06-21 00037BAC6|00081TAK4|54.0624638691505|71.8352439553422|8.21984435797665|1|63.9488539122463|2022-06-21 00037BAC6|00084DAL4|64.8087299031953|91.2979645415028|56.1237724661849|4|82.053347222349|2022-06-21
+---+-----++---+-----++---+-----++---+-----++---+-----++---+-----++---+-----++---+-----++---+-----++---+-----+
The csv file does not have headers in it but I am providing it through an avro schema separately . The headers are provided in the following format in source and target schema files:
{ "type":"record", "name":"data", "fields":[ { "name": "field1", "type": ["string","null"] }, { "name": "field2", "type": ["string","null"] },{ "name": "field3", "type":["string","null"] }, { "name": "field4", "type": ["string","null"] }, { "name": "field5", "type": ["string","null"] }, { "name": "field6", "type": ["string","null"] }, { "name": "field7", "type": ["string","null"] },{ "name": "date", "type": ["string","null"] } ]}
Below is the properties for Hudi deltastreamer:
hoodie.datasource.write.recordkey.field=field1,field2 hoodie.datasource.hive_sync.partition_fields=date hoodie.datasource.write.partitionpath.field=date hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator hoodie.datasource.hive_sync.table=TABLE1 hoodie.datasource.hive_sync.enable=true hoodie.datasource.hive_sync.assume_date_partitioning=false hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor hoodie.parquet.small.file.limit=134217728 hoodie.parquet.max.file.size=268435456 hoodie.cleaner.commits.retained=10 hoodie.deltastreamer.transformer.sql=select 1==2 AS _hoodie_is_deleted, 'I' as Op, * from <SRC> hoodie.datasource.hive_sync.support_timestamp=false hoodie.bloom.index.filter.type=DYNAMIC_V0
When I try ingestion the csv (without headers) using --hoodie-conf hoodie.deltastreamer.csv.header=false, I receive the below error in stacktrace. But if the csv.header = true and I add the headers manually at the top of the csv file, then the ingestion works successfully.
Stacktrace:
22/08/15 18:27:06 INFO Client:
client token: N/A
diagnostics: User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3.0 (TID 6, ip-10-72-3-64.ec2.internal, executor 1): org.apache.hudi.exception.HoodieKeyException: recordKey values: "field1:__null__,field2:__null__" for fields: [field1, field2] cannot be entirely null or empty.
at org.apache.hudi.keygen.KeyGenUtils.getRecordKey(KeyGenUtils.java:109)
at org.apache.hudi.keygen.ComplexAvroKeyGenerator.getRecordKey(ComplexAvroKeyGenerator.java:43)
at org.apache.hudi.keygen.ComplexKeyGenerator.getRecordKey(ComplexKeyGenerator.java:49)
at org.apache.hudi.keygen.BaseKeyGenerator.getKey(BaseKeyGenerator.java:62)
at org.apache.hudi.utilities.deltastreamer.DeltaSync.lambda$readFromSource$d62e16$1(DeltaSync.java:448)
at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:394)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at scala.collection.AbstractIterator.to(Iterator.scala:1334)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1334)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1334)
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$31.apply(RDD.scala:1409)
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$31.apply(RDD.scala:1409)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2151)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2151)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:411)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:417)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2171)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2159)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2158)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2158)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1011)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1011)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1011)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2419)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2368)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2357)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:822)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2111)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2132)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2151)
at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1409)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
at org.apache.spark.rdd.RDD.take(RDD.scala:1382)
at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply$mcZ$sp(RDD.scala:1517)
at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1517)
at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1517)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
at org.apache.spark.rdd.RDD.isEmpty(RDD.scala:1516)
at org.apache.spark.api.java.JavaRDDLike$class.isEmpty(JavaRDDLike.scala:544)
at org.apache.spark.api.java.AbstractJavaRDDLike.isEmpty(JavaRDDLike.scala:45)
at org.apache.hudi.utilities.deltastreamer.DeltaSync.writeToSink(DeltaSync.java:472)
at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:303)
at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$2(HoodieDeltaStreamer.java:186)
at org.apache.hudi.common.util.Option.ifPresent(Option.java:96)
at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:184)
at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:513)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:688)
Expected behavior
CSV file to be ingestion using Deltastreamer when providing AVRO schema separately.
Environment Description
Hudi version : 0.9 Spark version : Spark 2.4.8 Hive version : Hive 2.3.9 Hadoop version : AMZ 2.10.1 Storage (HDFS/S3/GCS..) : S3 Running on Docker? (yes/no) : no
Thanks for the help in advance!
@ankitchandnani I was able to reproduce the error. It seems to be due to an older version of spark-avro (3.0.1 that's used with Hudi 0.9) that doesn't seem to support unions well. Adding the following config helps resolve that since internally hudi converts the given AVRO schema to spark schema and back to AVRO schema, that seems to solve the incompatibility issue. Could you let me know if it works for you
--hoodie-conf hoodie.deltastreamer.schemaprovider.schema_post_processor=org.apache.hudi.utilities.schema.SparkAvroPostProcessor
@ankitchandnani : do you have any updates on this regard.
@nsivabalan I will try it out soon in the next week and provide an update
@ankitchandnani : sure. thanks.
@ankitchandnani : any update please.
Testing it out today, will provide an update by EOD
sure, thanks.
Hi @nsivabalan, could you please share the exact properties, config, and test data you tried out. I am having difficulty making this work for my use case.
@nsivabalan please let me know when you can
@rmahindra123 waiting for some assistance here
along w/ your existing command to run deltastreamer, you can add this additional config --hoodie-conf hoodie.deltastreamer.schemaprovider.schema_post_processor=org.apache.hudi.utilities.schema.SparkAvroPostProcessor
. no additional configs. or you can add below entry to your property file
hoodie.deltastreamer.schemaprovider.schema_post_processor=org.apache.hudi.utilities.schema.SparkAvroPostProcessor
or do you mean to say, after trying above config, still you are facing the issue?
Yes, after adding that line to my spark submit, I am still facing the issue. I haven’t tried adding it to the property file. I could try that now
Still not working even after adding it to the property file.
sorry, just now started looking into the issue. We have tests covering csv w/ and w/o header config set. lot of combinations have been tested, w/o and w/o schema provider, w/ and w/o transformer.
So, this is something to do w/ your dataset may be.
From the stacktrace, I see it shows
[field1, field2] cannot be entirely null or empty.
Do you know if there are rows where you record key field values are null? record key is not supposed to be null as per hudi.
but since there are no headers, would recommend setting schema provider. may be we are inferring a wrong field as record key fields.
@ankitchandnani did you get a chance to try out the above suggestion of setting schema provider? Please let us know if it's still an issue for you.
hey @ankitchandnani : can you let us know how we can help here. there is no activity here for quite sometime.
Hey @ankitchandnani Gentle ping.
@ankitchandnani Closing out this issue. Please reopen in case of any concerns.