hudi icon indicating copy to clipboard operation
hudi copied to clipboard

[HUDI-7522] Support find out the conflict instants in bucket partition when bucket id multiple

Open xuzifu666 opened this issue 11 months ago • 16 comments

Change Logs

Relate to the issue: https://github.com/apache/hudi/issues/7216 occur bucket id multiple, only need delete the parition next write can success,but did not delete partition currently,add a config to implement it.

Impact

low

Risk level (write none, low medium or high below)

low

Documentation Update

Describe any necessary documentation update if there is any new feature, config, or user-facing change. If not, put "none".

  • The config description must be updated if new configs are added or the default value of the configs are changed
  • Any new feature or user-facing change requires updating the Hudi website. Please create a Jira ticket, attach the ticket number here and follow the instruction to make changes to the website.

Contributor's checklist

  • [ ] Read through contributor's guide
  • [ ] Change Logs and Impact were stated clearly
  • [ ] Adequate tests were added if applicable
  • [ ] CI passed

xuzifu666 avatar Mar 21 '24 03:03 xuzifu666

@beyond1920 Can you help for the review?

danny0405 avatar Mar 21 '24 23:03 danny0405

process:

  1. bucket id conflict,contains dirty files 1711443671347.png

  2. unfinished instant which casue bucketid conflict 1711443667854.png

  3. call fix_bucket_path 1711443673147.png

  4. dirty files is clear after call 1711445546815.png

  5. next write job is successful and write not block 1711445552379.png

From the pr user can call fix_bucket_path(table => 'bdsp_test.hudi_test61', bucket_id=> '8', partition_path=>'xxx') to clean wrong bucket file,then job can try successful next.

xuzifu666 avatar Mar 26 '24 09:03 xuzifu666

It's not safe to delete all files of this bucket which belongs to pending commits. Those files might be generated by the ongoing writers. We need to consider the solution more carefully. Could we delete invalid files more strictly, for example, specify the partition path, bucket id and instant time.

agree,I would add instant time to the procedure.Thanks~@beyond1920

xuzifu666 avatar Mar 27 '24 15:03 xuzifu666

@xuzifu666 There is an existed RollbackToInstantTimeProcedure, could you use it?

beyond1920 avatar Mar 27 '24 15:03 beyond1920

@xuzifu666 There is an existed RollbackToInstantTimeProcedure, could you use it?

OK,I would rewrite the method to dig error instant@beyond1920

xuzifu666 avatar Mar 27 '24 15:03 xuzifu666

@beyond1920 After change ,user can get the error instant to rollback from the exception msg as follow: 1711606570263.jpg

xuzifu666 avatar Mar 28 '24 06:03 xuzifu666

@xuzifu666 @danny0405 @beyond1920 i think we should solve the root cause of bucket duplication. There are currently three situations where bucket file duplication occurs

  1. Spark Speculation execution . Turn off speculative execution , we can solve this problem
  2. hoodier archiver Parallel deletet complete timeline . 1.0 has solved this problem.
  3. Concurrent into ovewrite of multiple spark writer . this is a bug need to fixed.

now focus on the scence3: Concurrent into ovewrite of multiple spark writer when hudi build fileslice, hudi will call isFileSliceCommitted to Determine if the current file is committed.

  /**
   * A FileSlice is considered committed, if one of the following is true - There is a committed data file - There are
   * some log files, that are based off a commit or delta commit.
   */
  private boolean isFileSliceCommitted(FileSlice slice) {
    if (!compareTimestamps(slice.getBaseInstantTime(), LESSER_THAN_OR_EQUALS, lastInstant.get().getTimestamp())) {
      return false;
    }

    return timeline.containsOrBeforeTimelineStarts(slice.getBaseInstantTime());
  }

this is ok for single concurrent write scenario, but for mutil write the logical of isFileSliceCommitted has some bugs. If a file has a smaller commit time then smallest complete commit, Hudi will directly determine that the file is committed, even if it is a Garbage file or (File generated by write failure)

eg: two spark app insert overwrite hudi BUCKET table with same partition. app1: start write commit at 00000001 write files: 000000000--uuid1.parquet app2: start write commit at 00000002 write files: 000000000--uuid2.parquet app1 maybe failed to write due to OCC /cancel/OOM, but 000000000--uuid1.parquet is already written. when hudi build fileslice, 000000000--uuid1.parquet is considered as committed. since it‘s committime 00000001 < smallest complete commit 00000002. this is wrong, committime 00000001 is not committed maybe we can modify isFileSliceCommitted like this

   private boolean isFileSliceCommitted(FileSlice slice) {
    if (!compareTimestamps(slice.getBaseInstantTime(), LESSER_THAN_OR_EQUALS, lastInstant.get().getTimestamp())) {
      return false;
    }

    return timeline.containsOrBeforeTimelineStarts(slice.getBaseInstantTime()) && UncompleteTimelineNotContains(slice.getBaseInstantTime());
  }

finally, I think Hudi's fileslices should be managed uniformly, just like iceberg/delta lakes, rather than being obtained through list operation.

xiarixiaoyao avatar Apr 01 '24 10:04 xiarixiaoyao

we also meet the issue in our tests, the case is that we just use simple bucket index with mor without partition, and when we restart the job, it will write success once, and but then the bucket id conflict. 00000074-3413-4e9e-b4dd-4676e4eeccb4-0_74-8-3151_20240426173957249.parquet (bulkinsert generate it) 00000074-50bc-4b34-82c5-08c210d82d33-0_74-26-37004_20240428145515030.parquet (deltacommit generate it after a restart) according to the driver log, the restart job read a rollback commit, and seems the timeline not load all the bucket completely 24/04/28 14:57:32 INFO HoodieBucketIndex: Get BucketIndexLocationMapper for partitions: [] 24/04/28 14:57:32 INFO HoodieActiveTimeline: Loaded instants upto : Option{val=[20240428145515193__rollback__COMPLETED__20240428145515760]}

whocanhu avatar Apr 28 '24 10:04 whocanhu

we also meet the issue in our tests, the case is that we just use simple bucket index with mor without partition, and when we restart the job, it will write success once, and but then the bucket id conflict. 00000074-3413-4e9e-b4dd-4676e4eeccb4-0_74-8-3151_20240426173957249.parquet (bulkinsert generate it) 00000074-50bc-4b34-82c5-08c210d82d33-0_74-26-37004_20240428145515030.parquet (deltacommit generate it after a restart) according to the driver log, the restart job read a rollback commit, and seems the timeline not load all the bucket completely 24/04/28 14:57:32 INFO HoodieBucketIndex: Get BucketIndexLocationMapper for partitions: [] 24/04/28 14:57:32 INFO HoodieActiveTimeline: Loaded instants upto : Option{val=[20240428145515193__rollback__COMPLETED__20240428145515760]}

Yes,it is a serious problem which would block user's business. would rise it to find out error instant firstly which can help user to continue their business. @danny0405

xuzifu666 avatar Apr 28 '24 10:04 xuzifu666

we also meet the issue in our tests, the case is that we just use simple bucket index with mor without partition, and when we restart the job, it will write success once, and but then the bucket id conflict. 00000074-3413-4e9e-b4dd-4676e4eeccb4-0_74-8-3151_20240426173957249.parquet (bulkinsert generate it) 00000074-50bc-4b34-82c5-08c210d82d33-0_74-26-37004_20240428145515030.parquet (deltacommit generate it after a restart) according to the driver log, the restart job read a rollback commit, and seems the timeline not load all the bucket completely 24/04/28 14:57:32 INFO HoodieBucketIndex: Get BucketIndexLocationMapper for partitions: [] 24/04/28 14:57:32 INFO HoodieActiveTimeline: Loaded instants upto : Option{val=[20240428145515193__rollback__COMPLETED__20240428145515760]}

Yes,it is a serious problem which would block user's business. would rise it to find out error instant firstly which can help user to continue their business. @danny0405

Actually in my use case, only single spark job, and we have already turn off the spark speculation, so should no exists multi writer, we just restart the job and triggered a rollback, then met this issue. there should exists some bug to get out the complete history timeline, i have the log, i want to know is there any log i could search to help us to identify the root cause

whocanhu avatar Apr 28 '24 10:04 whocanhu

we also meet the issue in our tests, the case is that we just use simple bucket index with mor without partition, and when we restart the job, it will write success once, and but then the bucket id conflict. 00000074-3413-4e9e-b4dd-4676e4eeccb4-0_74-8-3151_20240426173957249.parquet (bulkinsert generate it) 00000074-50bc-4b34-82c5-08c210d82d33-0_74-26-37004_20240428145515030.parquet (deltacommit generate it after a restart) according to the driver log, the restart job read a rollback commit, and seems the timeline not load all the bucket completely 24/04/28 14:57:32 INFO HoodieBucketIndex: Get BucketIndexLocationMapper for partitions: [] 24/04/28 14:57:32 INFO HoodieActiveTimeline: Loaded instants upto : Option{val=[20240428145515193__rollback__COMPLETED__20240428145515760]}

Yes,it is a serious problem which would block user's business. would rise it to find out error instant firstly which can help user to continue their business. @danny0405

actually in my use case, no multi writer, just a restart triggered a rollback, then met this issue.

At lease get the error instant can help you to continue your job.

xuzifu666 avatar Apr 28 '24 11:04 xuzifu666

we also meet the issue in our tests, the case is that we just use simple bucket index with mor without partition, and when we restart the job, it will write success once, and but then the bucket id conflict. 00000074-3413-4e9e-b4dd-4676e4eeccb4-0_74-8-3151_20240426173957249.parquet (bulkinsert generate it) 00000074-50bc-4b34-82c5-08c210d82d33-0_74-26-37004_20240428145515030.parquet (deltacommit generate it after a restart) according to the driver log, the restart job read a rollback commit, and seems the timeline not load all the bucket completely 24/04/28 14:57:32 INFO HoodieBucketIndex: Get BucketIndexLocationMapper for partitions: [] 24/04/28 14:57:32 INFO HoodieActiveTimeline: Loaded instants upto : Option{val=[20240428145515193__rollback__COMPLETED__20240428145515760]}

Yes,it is a serious problem which would block user's business. would rise it to find out error instant firstly which can help user to continue their business. @danny0405

actually in my use case, no multi writer, just a restart triggered a rollback, then met this issue.

At lease get the error instant can help you to continue your job.

my use case is cdc, so should keep the data complete with the upstream db table, and the deltacommit that introduce the conflict bucket deltacommit is completed, so we couldn't just delete the old one and let the job continue, the ideal way to get out the root cause and avoid the conflict bucket create, according to my simple analysis, the cause is that the timeline reload has some issue, cuz we used our internally consist hash for a long time, no this issue, but bucket index, meet this issue, consist hash and bucket index all invoke the getLatestFileSlicesForPartition, so i think maybe this one may caused by reloadActiveTimeline when restart, and one detail is that, when we restart, the first deltacommit not load all the bucket filegroup and introduce a complict bucket file, the second deltacommit load all the bucket filegroup, then throw Find multiple files at partition path= exception image image

whocanhu avatar Apr 28 '24 11:04 whocanhu

Can you squash the commits into one?

danny0405 avatar May 06 '24 07:05 danny0405

Can you squash the commits into one?

OK,had squashed to one commit. @danny0405

xuzifu666 avatar May 06 '24 07:05 xuzifu666

Thanks for the work, I have reviewed and applied a patch here: 7522.patch.zip

Please supplement the tests with your spare time.

danny0405 avatar May 07 '24 01:05 danny0405

CI report:

  • e9fc630d3a8999c7ef0db7bd94da910b1f77df7d UNKNOWN
  • b7011691a07deb288ce0341dcd55bb6feeb4101d UNKNOWN
  • 1b9d39facd9186697fb56e1e2e79ab3b3ded4ce5 Azure: SUCCESS
Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

hudi-bot avatar May 07 '24 07:05 hudi-bot

Merge it first and let's supplement the test in s separate PR.

danny0405 avatar May 08 '24 04:05 danny0405