hudi icon indicating copy to clipboard operation
hudi copied to clipboard

[SUPPORT] Incremental and snapshot reads shows different results

Open eshu opened this issue 3 years ago • 7 comments

Hudi version: 0.11.1 Spark version: 3.1.1 Storage: S3 AWS Glue: 3

Function

import org.apache.spark.sql.{functions => fn}

    def readAndShow(path: String) {
      val df = spark.read.format("hudi").load(path)
      df.select(fn.min(fn.col("updated_at")), fn.min(fn.col("_hoodie_commit_time"))) show false
    
      val idf = spark.read.format("hudi")
        .option("hoodie.datasource.query.type", "incremental")
        .option("hoodie.datasource.read.begin.instanttime", "0")
        .option("hoodie.datasource.read.incr.fallback.fulltablescan.enable", "true")
        .load(path)
      idf.select(fn.min(fn.col("updated_at")), fn.min(fn.col("_hoodie_commit_time"))) show false
    }

shows different results, but because hoodie.datasource.read.begin.instanttime is 0, the same ones should appear.

Examples of output

+-------------------+------------------------+
|min(updated_at)    |min(_hoodie_commit_time)|
+-------------------+------------------------+
|2018-09-05 13:29:40|20220608041038459       |
+-------------------+------------------------+

+-----------------------+------------------------+
|min(updated_at)        |min(_hoodie_commit_time)|
+-----------------------+------------------------+
|2022-07-06 12:02:04.736|20220706130300379       |
+-----------------------+------------------------+
+-------------------+------------------------+
|min(updated_at)    |min(_hoodie_commit_time)|
+-------------------+------------------------+
|2019-08-28 08:47:12|20220606081029941       |
+-------------------+------------------------+

+-------------------+------------------------+
|min(updated_at)    |min(_hoodie_commit_time)|
+-------------------+------------------------+
|2022-07-06 00:02:04|20220706010239483       |
+-------------------+------------------------+

These results are reproducible, but unfortunately I can't provide the dataset. I tried with and without hoodie.datasource.read.incr.fallback.fulltablescan.enable option, but the result is the same all the time. Could you please help me to read data with the incremental query from the beginning?

When I tried to replace 0 with commit time from the first request of function, I receive the same result.

eshu avatar Jul 07 '22 06:07 eshu

It looks like the response is here: https://github.com/apache/hudi/issues/2841

So if I have a job that commits data 1 time per 15 minutes, to keep weekly data do I need to set the parameter hoodie.cleaner.commits.retained to 4 * 24 * 7?

eshu avatar Jul 08 '22 02:07 eshu

no, Hudi always keeps the latest version of records in DFS.

It looks like the response is here: #2841

So if I have a job that commits data 1 time per 15 minutes, to keep weekly data do I need to set the parameter hoodie.cleaner.commits.retained to 4 * 24 * 7?

For the different result issue, could you upload the file under /.hoodie here? or just give a screenshot first is also ok

fengjian428 avatar Jul 08 '22 16:07 fengjian428

@fengjian428 I added the beginning and the end of .hoodie: Screen Shot 2022-07-11 at 7 51 27 Screen Shot 2022-07-11 at 7 52 05

eshu avatar Jul 10 '22 22:07 eshu

Current situation with incremental read: Case 1, previous commit time from the incremental reading was 20220706130300379

+-----------------------+------------------------+
|min(updated_at)        |min(_hoodie_commit_time)|
+-----------------------+------------------------+
|2022-07-10 00:15:00.727|20220710011447293       |
+-----------------------+------------------------+

Case 2, previous commit time was 20220706010239483:

+-------------------+------------------------+
|min(updated_at)    |min(_hoodie_commit_time)|
+-------------------+------------------------+
|2022-07-09 18:05:41|20220709185647477       |
+-------------------+------------------------+

It looks a cleaner process is involved here. We lost more records from the incremental read. Snapshot read have the same commit times (20220606082430597 and 20220608041038459 respectively).

eshu avatar Jul 10 '22 23:07 eshu

image when using the incremental query, Hudi only will construct a timeline with commits that have not been cleand&achieved. and it will filter data out by the head commit. I can create a pr to fix this. but I'm not very sure whether this is a by-design behavior or not

fengjian428 avatar Jul 12 '22 17:07 fengjian428

create a Jira ticket to track https://issues.apache.org/jira/browse/HUDI-4402

fengjian428 avatar Jul 15 '22 03:07 fengjian428

Let me try to explain how incremental query works.

Lets say, you have made 10 commits to hudi table. out of which first 5 are archived already. So, active timeline only has 5 commits i.e. C6 to C10.

When you do snapshot query now, you will be able to read all records ingested from C1 until C10. But when you trigger incremental query, it fetches records ingested with certain commit times. So, starting from C6, lets say you have set end commit to C7. what incremental query serves is, only those records which got ingested in C6 and C7. C6, bcoz thats the first commit in the active timeline.

Hope this gives you a picture of how incremental query works. not sure if we can do any fix around this. let me know if you have any further question.

nsivabalan avatar Aug 10 '22 03:08 nsivabalan

@eshu : did you checkout my previous msg. does that makes sense. Feel free to close out the issue if you don't have further questions.

nsivabalan avatar Aug 16 '22 07:08 nsivabalan

@nsivabalan If there is no way to read all records since a specified time, it makes no sense in incremental read at all. Is it possible to implement incremental readin for the whole dataset?

My goal was to get all changed records from a bookmark point. I have no idea, what the point in reading latest commited records...

eshu avatar Aug 28 '22 02:08 eshu