hudi
hudi copied to clipboard
[HUDI-7522] Support find out the conflict instants in bucket partition when bucket id multiple
Change Logs
Relate to the issue: https://github.com/apache/hudi/issues/7216 occur bucket id multiple, only need delete the parition next write can success,but did not delete partition currently,add a config to implement it.
Impact
low
Risk level (write none, low medium or high below)
low
Documentation Update
Describe any necessary documentation update if there is any new feature, config, or user-facing change. If not, put "none".
- The config description must be updated if new configs are added or the default value of the configs are changed
- Any new feature or user-facing change requires updating the Hudi website. Please create a Jira ticket, attach the ticket number here and follow the instruction to make changes to the website.
Contributor's checklist
- [ ] Read through contributor's guide
- [ ] Change Logs and Impact were stated clearly
- [ ] Adequate tests were added if applicable
- [ ] CI passed
@beyond1920 Can you help for the review?
process:
-
bucket id conflict,contains dirty files
-
unfinished instant which casue bucketid conflict
-
call fix_bucket_path
-
dirty files is clear after call
-
next write job is successful and write not block
From the pr user can call fix_bucket_path(table => 'bdsp_test.hudi_test61', bucket_id=> '8', partition_path=>'xxx') to clean wrong bucket file,then job can try successful next.
It's not safe to delete all files of this bucket which belongs to pending commits. Those files might be generated by the ongoing writers. We need to consider the solution more carefully. Could we delete invalid files more strictly, for example, specify the partition path, bucket id and instant time.
agree,I would add instant time to the procedure.Thanks~@beyond1920
@xuzifu666 There is an existed RollbackToInstantTimeProcedure
, could you use it?
@xuzifu666 There is an existed
RollbackToInstantTimeProcedure
, could you use it?
OK,I would rewrite the method to dig error instant@beyond1920
@beyond1920 After change ,user can get the error instant to rollback from the exception msg as follow:
@xuzifu666 @danny0405 @beyond1920 i think we should solve the root cause of bucket duplication. There are currently three situations where bucket file duplication occurs
- Spark Speculation execution . Turn off speculative execution , we can solve this problem
- hoodier archiver Parallel deletet complete timeline . 1.0 has solved this problem.
- Concurrent into ovewrite of multiple spark writer . this is a bug need to fixed.
now focus on the scence3: Concurrent into ovewrite of multiple spark writer when hudi build fileslice, hudi will call isFileSliceCommitted to Determine if the current file is committed.
/**
* A FileSlice is considered committed, if one of the following is true - There is a committed data file - There are
* some log files, that are based off a commit or delta commit.
*/
private boolean isFileSliceCommitted(FileSlice slice) {
if (!compareTimestamps(slice.getBaseInstantTime(), LESSER_THAN_OR_EQUALS, lastInstant.get().getTimestamp())) {
return false;
}
return timeline.containsOrBeforeTimelineStarts(slice.getBaseInstantTime());
}
this is ok for single concurrent write scenario, but for mutil write the logical of isFileSliceCommitted has some bugs. If a file has a smaller commit time then smallest complete commit, Hudi will directly determine that the file is committed, even if it is a Garbage file or (File generated by write failure)
eg: two spark app insert overwrite hudi BUCKET table with same partition. app1: start write commit at 00000001 write files: 000000000--uuid1.parquet app2: start write commit at 00000002 write files: 000000000--uuid2.parquet app1 maybe failed to write due to OCC /cancel/OOM, but 000000000--uuid1.parquet is already written. when hudi build fileslice, 000000000--uuid1.parquet is considered as committed. since it‘s committime 00000001 < smallest complete commit 00000002. this is wrong, committime 00000001 is not committed maybe we can modify isFileSliceCommitted like this
private boolean isFileSliceCommitted(FileSlice slice) {
if (!compareTimestamps(slice.getBaseInstantTime(), LESSER_THAN_OR_EQUALS, lastInstant.get().getTimestamp())) {
return false;
}
return timeline.containsOrBeforeTimelineStarts(slice.getBaseInstantTime()) && UncompleteTimelineNotContains(slice.getBaseInstantTime());
}
finally, I think Hudi's fileslices should be managed uniformly, just like iceberg/delta lakes, rather than being obtained through list operation.
we also meet the issue in our tests, the case is that we just use simple bucket index with mor without partition, and when we restart the job, it will write success once, and but then the bucket id conflict. 00000074-3413-4e9e-b4dd-4676e4eeccb4-0_74-8-3151_20240426173957249.parquet (bulkinsert generate it) 00000074-50bc-4b34-82c5-08c210d82d33-0_74-26-37004_20240428145515030.parquet (deltacommit generate it after a restart) according to the driver log, the restart job read a rollback commit, and seems the timeline not load all the bucket completely 24/04/28 14:57:32 INFO HoodieBucketIndex: Get BucketIndexLocationMapper for partitions: [] 24/04/28 14:57:32 INFO HoodieActiveTimeline: Loaded instants upto : Option{val=[20240428145515193__rollback__COMPLETED__20240428145515760]}
we also meet the issue in our tests, the case is that we just use simple bucket index with mor without partition, and when we restart the job, it will write success once, and but then the bucket id conflict. 00000074-3413-4e9e-b4dd-4676e4eeccb4-0_74-8-3151_20240426173957249.parquet (bulkinsert generate it) 00000074-50bc-4b34-82c5-08c210d82d33-0_74-26-37004_20240428145515030.parquet (deltacommit generate it after a restart) according to the driver log, the restart job read a rollback commit, and seems the timeline not load all the bucket completely 24/04/28 14:57:32 INFO HoodieBucketIndex: Get BucketIndexLocationMapper for partitions: [] 24/04/28 14:57:32 INFO HoodieActiveTimeline: Loaded instants upto : Option{val=[20240428145515193__rollback__COMPLETED__20240428145515760]}
Yes,it is a serious problem which would block user's business. would rise it to find out error instant firstly which can help user to continue their business. @danny0405
we also meet the issue in our tests, the case is that we just use simple bucket index with mor without partition, and when we restart the job, it will write success once, and but then the bucket id conflict. 00000074-3413-4e9e-b4dd-4676e4eeccb4-0_74-8-3151_20240426173957249.parquet (bulkinsert generate it) 00000074-50bc-4b34-82c5-08c210d82d33-0_74-26-37004_20240428145515030.parquet (deltacommit generate it after a restart) according to the driver log, the restart job read a rollback commit, and seems the timeline not load all the bucket completely 24/04/28 14:57:32 INFO HoodieBucketIndex: Get BucketIndexLocationMapper for partitions: [] 24/04/28 14:57:32 INFO HoodieActiveTimeline: Loaded instants upto : Option{val=[20240428145515193__rollback__COMPLETED__20240428145515760]}
Yes,it is a serious problem which would block user's business. would rise it to find out error instant firstly which can help user to continue their business. @danny0405
Actually in my use case, only single spark job, and we have already turn off the spark speculation, so should no exists multi writer, we just restart the job and triggered a rollback, then met this issue. there should exists some bug to get out the complete history timeline, i have the log, i want to know is there any log i could search to help us to identify the root cause
we also meet the issue in our tests, the case is that we just use simple bucket index with mor without partition, and when we restart the job, it will write success once, and but then the bucket id conflict. 00000074-3413-4e9e-b4dd-4676e4eeccb4-0_74-8-3151_20240426173957249.parquet (bulkinsert generate it) 00000074-50bc-4b34-82c5-08c210d82d33-0_74-26-37004_20240428145515030.parquet (deltacommit generate it after a restart) according to the driver log, the restart job read a rollback commit, and seems the timeline not load all the bucket completely 24/04/28 14:57:32 INFO HoodieBucketIndex: Get BucketIndexLocationMapper for partitions: [] 24/04/28 14:57:32 INFO HoodieActiveTimeline: Loaded instants upto : Option{val=[20240428145515193__rollback__COMPLETED__20240428145515760]}
Yes,it is a serious problem which would block user's business. would rise it to find out error instant firstly which can help user to continue their business. @danny0405
actually in my use case, no multi writer, just a restart triggered a rollback, then met this issue.
At lease get the error instant can help you to continue your job.
we also meet the issue in our tests, the case is that we just use simple bucket index with mor without partition, and when we restart the job, it will write success once, and but then the bucket id conflict. 00000074-3413-4e9e-b4dd-4676e4eeccb4-0_74-8-3151_20240426173957249.parquet (bulkinsert generate it) 00000074-50bc-4b34-82c5-08c210d82d33-0_74-26-37004_20240428145515030.parquet (deltacommit generate it after a restart) according to the driver log, the restart job read a rollback commit, and seems the timeline not load all the bucket completely 24/04/28 14:57:32 INFO HoodieBucketIndex: Get BucketIndexLocationMapper for partitions: [] 24/04/28 14:57:32 INFO HoodieActiveTimeline: Loaded instants upto : Option{val=[20240428145515193__rollback__COMPLETED__20240428145515760]}
Yes,it is a serious problem which would block user's business. would rise it to find out error instant firstly which can help user to continue their business. @danny0405
actually in my use case, no multi writer, just a restart triggered a rollback, then met this issue.
At lease get the error instant can help you to continue your job.
my use case is cdc, so should keep the data complete with the upstream db table, and the deltacommit that introduce the conflict bucket deltacommit is completed, so we couldn't just delete the old one and let the job continue, the ideal way to get out the root cause and avoid the conflict bucket create, according to my simple analysis, the cause is that the timeline reload has some issue, cuz we used our internally consist hash for a long time, no this issue, but bucket index, meet this issue, consist hash and bucket index all invoke the getLatestFileSlicesForPartition, so i think maybe this one may caused by reloadActiveTimeline when restart, and one detail is that, when we restart, the first deltacommit not load all the bucket filegroup and introduce a complict bucket file, the second deltacommit load all the bucket filegroup, then throw Find multiple files at partition path= exception
Can you squash the commits into one?
Can you squash the commits into one?
OK,had squashed to one commit. @danny0405
Thanks for the work, I have reviewed and applied a patch here: 7522.patch.zip
Please supplement the tests with your spare time.
CI report:
- e9fc630d3a8999c7ef0db7bd94da910b1f77df7d UNKNOWN
- b7011691a07deb288ce0341dcd55bb6feeb4101d UNKNOWN
- 1b9d39facd9186697fb56e1e2e79ab3b3ded4ce5 Azure: SUCCESS
Bot commands
@hudi-bot supports the following commands:-
@hudi-bot run azure
re-run the last Azure build
Merge it first and let's supplement the test in s separate PR.