hudi icon indicating copy to clipboard operation
hudi copied to clipboard

[SUPPORT] Hudi Deltastreamer CSV ingestion issue

Open ankitchandnani opened this issue 2 years ago • 14 comments

Tips before filing an issue

Have you gone through our FAQs? Yes

Describe the problem you faced

Below is a sample chunk from a csv that is being ingested through Hudi Deltastreamer 0.9. +---+-----++---+-----++---+-----++---+-----++---+-----++---+-----++---+-----++---+-----++---+-----++---+-----+ 00037BAC6|00037BAF9|100|91.8886010561736|66.1986127558789|4|99.9443005280868|2022-06-21 00037BAC6|00077TAA2|32.3719958216579|67.2034832818589|0|0|49.7877395517584|2022-06-21 00037BAC6|00080QAF2|63.7767687043239|96.1682614803625|38.2990550305725|2|81.9725150923432|2022-06-21 00037BAC6|00081TAK4|54.0624638691505|71.8352439553422|8.21984435797665|1|63.9488539122463|2022-06-21 00037BAC6|00084DAL4|64.8087299031953|91.2979645415028|56.1237724661849|4|82.053347222349|2022-06-21

+---+-----++---+-----++---+-----++---+-----++---+-----++---+-----++---+-----++---+-----++---+-----++---+-----+

The csv file does not have headers in it but I am providing it through an avro schema separately . The headers are provided in the following format in source and target schema files:

{ "type":"record", "name":"data", "fields":[ { "name": "field1", "type": ["string","null"] }, { "name": "field2", "type": ["string","null"] },{ "name": "field3", "type":["string","null"] }, { "name": "field4", "type": ["string","null"] }, { "name": "field5", "type": ["string","null"] }, { "name": "field6", "type": ["string","null"] }, { "name": "field7", "type": ["string","null"] },{ "name": "date", "type": ["string","null"] } ]}

Below is the properties for Hudi deltastreamer:

hoodie.datasource.write.recordkey.field=field1,field2 hoodie.datasource.hive_sync.partition_fields=date hoodie.datasource.write.partitionpath.field=date hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator hoodie.datasource.hive_sync.table=TABLE1 hoodie.datasource.hive_sync.enable=true hoodie.datasource.hive_sync.assume_date_partitioning=false hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor hoodie.parquet.small.file.limit=134217728 hoodie.parquet.max.file.size=268435456 hoodie.cleaner.commits.retained=10 hoodie.deltastreamer.transformer.sql=select 1==2 AS _hoodie_is_deleted, 'I' as Op, * from <SRC> hoodie.datasource.hive_sync.support_timestamp=false hoodie.bloom.index.filter.type=DYNAMIC_V0

When I try ingestion the csv (without headers) using --hoodie-conf hoodie.deltastreamer.csv.header=false, I receive the below error in stacktrace. But if the csv.header = true and I add the headers manually at the top of the csv file, then the ingestion works successfully.

Stacktrace:

22/08/15 18:27:06 INFO Client: 
	 client token: N/A
	 diagnostics: User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3.0 (TID 6, ip-10-72-3-64.ec2.internal, executor 1): org.apache.hudi.exception.HoodieKeyException: recordKey values: "field1:__null__,field2:__null__" for fields: [field1, field2] cannot be entirely null or empty.
	at org.apache.hudi.keygen.KeyGenUtils.getRecordKey(KeyGenUtils.java:109)
	at org.apache.hudi.keygen.ComplexAvroKeyGenerator.getRecordKey(ComplexAvroKeyGenerator.java:43)
	at org.apache.hudi.keygen.ComplexKeyGenerator.getRecordKey(ComplexKeyGenerator.java:49)
	at org.apache.hudi.keygen.BaseKeyGenerator.getKey(BaseKeyGenerator.java:62)
	at org.apache.hudi.utilities.deltastreamer.DeltaSync.lambda$readFromSource$d62e16$1(DeltaSync.java:448)
	at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:394)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at scala.collection.AbstractIterator.to(Iterator.scala:1334)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1334)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at scala.collection.AbstractIterator.toArray(Iterator.scala:1334)
	at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$31.apply(RDD.scala:1409)
	at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$31.apply(RDD.scala:1409)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2151)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2151)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:411)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:417)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2171)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2159)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2158)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2158)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1011)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1011)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1011)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2419)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2368)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2357)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:822)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2111)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2132)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2151)
	at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1409)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
	at org.apache.spark.rdd.RDD.take(RDD.scala:1382)
	at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply$mcZ$sp(RDD.scala:1517)
	at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1517)
	at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1517)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
	at org.apache.spark.rdd.RDD.isEmpty(RDD.scala:1516)
	at org.apache.spark.api.java.JavaRDDLike$class.isEmpty(JavaRDDLike.scala:544)
	at org.apache.spark.api.java.AbstractJavaRDDLike.isEmpty(JavaRDDLike.scala:45)
	at org.apache.hudi.utilities.deltastreamer.DeltaSync.writeToSink(DeltaSync.java:472)
	at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:303)
	at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$2(HoodieDeltaStreamer.java:186)
	at org.apache.hudi.common.util.Option.ifPresent(Option.java:96)
	at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:184)
	at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:513)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:688)

Expected behavior

CSV file to be ingestion using Deltastreamer when providing AVRO schema separately.

Environment Description

Hudi version : 0.9 Spark version : Spark 2.4.8 Hive version : Hive 2.3.9 Hadoop version : AMZ 2.10.1 Storage (HDFS/S3/GCS..) : S3 Running on Docker? (yes/no) : no

Thanks for the help in advance!

ankitchandnani avatar Aug 15 '22 19:08 ankitchandnani

@ankitchandnani I was able to reproduce the error. It seems to be due to an older version of spark-avro (3.0.1 that's used with Hudi 0.9) that doesn't seem to support unions well. Adding the following config helps resolve that since internally hudi converts the given AVRO schema to spark schema and back to AVRO schema, that seems to solve the incompatibility issue. Could you let me know if it works for you

--hoodie-conf hoodie.deltastreamer.schemaprovider.schema_post_processor=org.apache.hudi.utilities.schema.SparkAvroPostProcessor

rmahindra123 avatar Aug 23 '22 08:08 rmahindra123

@ankitchandnani : do you have any updates on this regard.

nsivabalan avatar Aug 27 '22 20:08 nsivabalan

@nsivabalan I will try it out soon in the next week and provide an update

ankitchandnani avatar Aug 31 '22 19:08 ankitchandnani

@ankitchandnani : sure. thanks.

nsivabalan avatar Sep 06 '22 04:09 nsivabalan

@ankitchandnani : any update please.

nsivabalan avatar Sep 19 '22 14:09 nsivabalan

Testing it out today, will provide an update by EOD

ankitchandnani avatar Sep 19 '22 14:09 ankitchandnani

sure, thanks.

nsivabalan avatar Sep 19 '22 23:09 nsivabalan

Hi @nsivabalan, could you please share the exact properties, config, and test data you tried out. I am having difficulty making this work for my use case.

ankitchandnani avatar Sep 20 '22 02:09 ankitchandnani

@nsivabalan please let me know when you can

ankitchandnani avatar Sep 20 '22 15:09 ankitchandnani

@rmahindra123 waiting for some assistance here

ankitchandnani avatar Sep 20 '22 21:09 ankitchandnani

along w/ your existing command to run deltastreamer, you can add this additional config --hoodie-conf hoodie.deltastreamer.schemaprovider.schema_post_processor=org.apache.hudi.utilities.schema.SparkAvroPostProcessor. no additional configs. or you can add below entry to your property file

hoodie.deltastreamer.schemaprovider.schema_post_processor=org.apache.hudi.utilities.schema.SparkAvroPostProcessor

nsivabalan avatar Sep 21 '22 01:09 nsivabalan

or do you mean to say, after trying above config, still you are facing the issue?

nsivabalan avatar Sep 21 '22 01:09 nsivabalan

Yes, after adding that line to my spark submit, I am still facing the issue. I haven’t tried adding it to the property file. I could try that now

ankitchandnani avatar Sep 21 '22 01:09 ankitchandnani

Still not working even after adding it to the property file.

ankitchandnani avatar Sep 21 '22 02:09 ankitchandnani

sorry, just now started looking into the issue. We have tests covering csv w/ and w/o header config set. lot of combinations have been tested, w/o and w/o schema provider, w/ and w/o transformer.

So, this is something to do w/ your dataset may be.

From the stacktrace, I see it shows

[field1, field2] cannot be entirely null or empty.

Do you know if there are rows where you record key field values are null? record key is not supposed to be null as per hudi.

nsivabalan avatar Sep 29 '22 01:09 nsivabalan

but since there are no headers, would recommend setting schema provider. may be we are inferring a wrong field as record key fields.

nsivabalan avatar Sep 29 '22 01:09 nsivabalan

@ankitchandnani did you get a chance to try out the above suggestion of setting schema provider? Please let us know if it's still an issue for you.

codope avatar Feb 01 '23 14:02 codope

hey @ankitchandnani : can you let us know how we can help here. there is no activity here for quite sometime.

nsivabalan avatar Feb 06 '23 23:02 nsivabalan

Hey @ankitchandnani Gentle ping.

ad1happy2go avatar May 24 '23 16:05 ad1happy2go

@ankitchandnani Closing out this issue. Please reopen in case of any concerns.

ad1happy2go avatar Oct 05 '23 09:10 ad1happy2go