metorikku icon indicating copy to clipboard operation
metorikku copied to clipboard

how to read from a hudi input?

Open tooptoop4 opened this issue 4 years ago • 5 comments

i have

inputs:
  mydf:
    file:
      path: s3a://xx/a/b/c/

there are partition folders under s3a://xx/a/b/c/ path . and there are hudi parquets under them

i want the mydf to get the partition columns in the df too.

i get

2020-03-25 16:20:27,070 [main] INFO  org.apache.spark.scheduler.DAGScheduler - Job 1 finished: load at FilesInput.scala:29, took 0.073979 s
Exception in thread "main" org.apache.spark.sql.AnalysisException: Unable to infer schema for Parquet. It must be specified manually.;
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$9.apply(DataSource.scala:208)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$9.apply(DataSource.scala:208)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:207)
        at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:393)
        at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
        at com.yotpo.metorikku.input.readers.file.FilesInput.read(FilesInput.scala:29)
        at com.yotpo.metorikku.input.readers.file.FileInput.read(FileInput.scala:15)
        at com.yotpo.metorikku.Job$$anonfun$registerDataframes$1.apply(Job.scala:68)
        at com.yotpo.metorikku.Job$$anonfun$registerDataframes$1.apply(Job.scala:66)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at com.yotpo.metorikku.Job.registerDataframes(Job.scala:66)
        at com.yotpo.metorikku.Job.<init>(Job.scala:48)
        at com.yotpo.metorikku.Metorikku$.delayedEndpoint$com$yotpo$metorikku$Metorikku$1(Metorikku.scala:10)
        at com.yotpo.metorikku.Metorikku$delayedInit$body.apply(Metorikku.scala:7)
        at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
        at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
        at scala.App$$anonfun$main$1.apply(App.scala:76)
        at scala.App$$anonfun$main$1.apply(App.scala:76)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
        at scala.App$class.main(App.scala:76)
        at com.yotpo.metorikku.Metorikku$.main(Metorikku.scala:7)
        at com.yotpo.metorikku.Metorikku.main(Metorikku.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:890)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:192)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:217)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
2020-03-25 16:20:27,077 [pool-1-thread-1] INFO  org.apache.spark.SparkContext - Invoking stop() from shutdown hook

also tried format: com.uber.hoodie

2020-03-25 16:36:07,067 [main] INFO  com.yotpo.metorikku.Job - Registering mydf table
Exception in thread "main" com.uber.hoodie.exception.HoodieException: 'path' must be specified.
        at com.uber.hoodie.DefaultSource.createRelation(DefaultSource.scala:57)
        at com.uber.hoodie.DefaultSource.createRelation(DefaultSource.scala:46)
        at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:341)
        at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
        at com.yotpo.metorikku.input.readers.file.FilesInput.read(FilesInput.scala:29)
        at com.yotpo.metorikku.input.readers.file.FileInput.read(FileInput.scala:15)
        at com.yotpo.metorikku.Job$$anonfun$registerDataframes$1.apply(Job.scala:68)
        at com.yotpo.metorikku.Job$$anonfun$registerDataframes$1.apply(Job.scala:66)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at com.yotpo.metorikku.Job.registerDataframes(Job.scala:66)
        at com.yotpo.metorikku.Job.<init>(Job.scala:48)
        at com.yotpo.metorikku.Metorikku$.delayedEndpoint$com$yotpo$metorikku$Metorikku$1(Metorikku.scala:10)
        at com.yotpo.metorikku.Metorikku$delayedInit$body.apply(Metorikku.scala:7)
        at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
        at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
        at scala.App$$anonfun$main$1.apply(App.scala:76)
        at scala.App$$anonfun$main$1.apply(App.scala:76)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
        at scala.App$class.main(App.scala:76)
        at com.yotpo.metorikku.Metorikku$.main(Metorikku.scala:7)
        at com.yotpo.metorikku.Metorikku.main(Metorikku.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:890)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:192)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:217)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

also tried --conf spark.sql.hive.convertMetastoreParquet=false same error result.

I have --jars "/home/ec2-user/hoodie-spark-bundle-0.4.6.jar"

these are COW tables, https://hudi.incubator.apache.org/docs/querying_data.html mentions: spark.sparkContext.hadoopConfiguration.setClass("mapreduce.input.pathFilter.class", classOf[org.apache.hudi.hadoop.HoodieROTablePathFilter], classOf[org.apache.hadoop.fs.PathFilter]);

^^not sure how to set that in metorikku?

if i do s3a://xx/a/b/c/*/*/*.parquet it seems to get further but not sure if? a) right approach, b) will have the partition columns, c) will have dupes in the data?

tooptoop4 avatar Mar 25 '20 17:03 tooptoop4

Hi @tooptoop4 sorry for the late response. I actually have no idea how you read in spark a hudi table without hive... I think that's the only way for now, hive actually holds the partition information. In any case you need to read this input as hoodie. so in the input:

inputs:
  mydf:
    file:
      path: s3a://xx/a/b/c/
    options:
      format: com.uber.hoodie

lyogev avatar Mar 30 '20 12:03 lyogev

that didnt work for me

tooptoop4 avatar Mar 30 '20 17:03 tooptoop4

What's the error you're getting? You may need to do something like: **/*.parquet in the path

lyogev avatar Apr 07 '20 17:04 lyogev

**/*.parquet only works when the hudi 'tbl' has been inserted just once, once it has been updated more parquets get produced. This leads to duplicate rows being read. https://cwiki.apache.org/confluence/display/HUDI/FAQ#FAQ-Whydowehavetoset2differentwaysofconfiguringSparktoworkwithHudi? "Without understanding of Hudi's file layout, engines would just plainly reading all the parquet files and displaying the data within them, with massive amounts of duplicates in the result."

tooptoop4 avatar Apr 29 '20 17:04 tooptoop4

--conf spark.hadoop.mapreduce.input.pathFilter.class=com.uber.hoodie.hadoop.HoodieROTablePathFilter works but it means all inputs (even some non hudi ones) get the pathFilter.class. Do you think metorikku needs a new option like (pathFilterClass per input)? As hudi support seems to be a key feature of metorikku

tooptoop4 avatar May 05 '20 23:05 tooptoop4