nessie
nessie copied to clipboard
Nessie GC implementation
- Changes in
gc-base
module:
- Remove commitProtection time configuration and use cutoff time itself to protect the commits.
- Modify IdentifiedResultsRepo contents as per the requirement of GC consumer algorithm. [like add commit hash, metadata location column, isExpired]
- Replace
ContentBloomFilter
withBloomFilter
ofContentFunnel
- Bloom filter is now per task instead of per table in a task.
- Make
deadReferenceCutOffTimeStamp
a mandatory config (to avoid fallback to live config) - Split
IdentifyContentsPerExecutor
into separate classes for live and expired computations. - Refactor some code for better maintainability.
- Changes in
gc-iceberg
module:
- Introduce
gc-iceberg
module - support stored procedure framework by introducing
NessieIcebergGcSparkCatalog
andNessieIcebergGcSparkSessionCatalog
- Introduce
IdentifyExpiredContentsProcedure
to callgc-base
module identify step and write output to table. - Introduce
ExpireContentsProcedure
to for gc consumer algorithm that implements own expire snapshots.
I doubt the proposed approach will work, added a bunch of comments.
There are also serious concerns about time and space complexity, which still stand for quite a while (see original gc-base PR). GC as it is implemented consumes way too much heap, can only let Iceberg expire one table at a time (too slow) and I'm quite unsure about whether the proposed checkpoint-mechanism works.
@snazy : Thanks for the review. I have replied to most of it.
Few points I will rework based on your suggestion (like keeping one output table, no need of expiry branch and use other catalog, make calling expire snapshots distributed or multi threaded).
Check point mechanism works, I have shared the document and PR has test cases.
Regarding overall time complexity, I tested 1000 commits in one reference on a spark cluster. One reference it took around 4 seconds to indentify the expired snapshots (this is with inmemory nessie backend store with local quarkus server).
I don't see other ways of doing GC. If you have a better solution, I am really happy to discuss.
Also, I think it is hard to review overall PR. Hence, I am splitting up these into sub-PRs this week. Each topic we can discuss in depth on the individual PRs.
Codecov Report
Merging #3885 (f9781d9) into main (da420cf) will decrease coverage by
1.86%
. The diff coverage is86.92%
.
@@ Coverage Diff @@
## main #3885 +/- ##
============================================
- Coverage 86.54% 84.68% -1.87%
+ Complexity 3652 3581 -71
============================================
Files 416 428 +12
Lines 16997 17250 +253
Branches 1456 1475 +19
============================================
- Hits 14710 14608 -102
- Misses 1785 2140 +355
Partials 502 502
Flag | Coverage Δ | |
---|---|---|
java | 84.90% <86.92%> (-2.08%) |
:arrow_down: |
javascript | 81.98% <ø> (ø) |
|
python | 82.86% <ø> (ø) |
Flags with carried forward coverage won't be shown. Click here to find out more.
Impacted Files | Coverage Δ | |
---|---|---|
...ectnessie/gc/base/IdentifyContentsPerExecutor.java | 91.66% <ø> (+1.66%) |
:arrow_up: |
...gc/iceberg/NessieIcebergGcSparkSessionCatalog.java | 0.00% <0.00%> (ø) |
|
.../main/java/org/projectnessie/gc/base/GCParams.java | 43.75% <37.50%> (-2.92%) |
:arrow_down: |
...g/projectnessie/gc/iceberg/ComputeAllFilesUDF.java | 66.66% <66.66%> (ø) |
|
.../org/projectnessie/gc/iceberg/GCProcedureUtil.java | 66.66% <66.66%> (ø) |
|
...c/main/java/org/apache/iceberg/GCMetadataUtil.java | 77.41% <77.41%> (ø) |
|
...ectnessie/gc/base/DistributedIdentifyContents.java | 86.48% <81.25%> (-0.48%) |
:arrow_down: |
...rc/main/java/org/projectnessie/gc/base/GCUtil.java | 56.52% <84.21%> (+1.75%) |
:arrow_up: |
...ie/gc/base/IdentifyExpiredContentsPerExecutor.java | 85.07% <85.07%> (ø) |
|
...e/gc/iceberg/IdentifyExpiredContentsProcedure.java | 86.53% <86.53%> (ø) |
|
... and 52 more |
Continue to review full report at Codecov.
Legend - Click here to learn more
Δ = absolute <relative> (impact)
,ø = not affected
,? = missing data
Powered by Codecov. Last update da420cf...f9781d9. Read the comment docs.
Tested the expire_snapshots of a dummy metadata that doesn't have some partition spec in its metadata for the spec id mentioned in some manifest list. It goes for NPE as partition summaries are collected for ALL_MANIFEST table (it uses schema and spec), it is not required to collect but Iceberg uses some common code :(
Also, If two commits allocated same snapshot id very very rare scenario), In Iceberg (without branching), I raised a PR to fix it. But with Nessie, we cannot know as the ALTER TABLE operation produces same snapshot id but different metadata. At Nessie side, I cannot know whether it is duplicate snapshot id or alter table operations.
@snazy, @keithgchapman : I cannot go ahead without a global state or sudo global state.
I have reworked on this PR.
a) Implemented own distributive (spark job) expire contents logic [ExpireContentsProcedure.java
] that invalidates lot of previous comments (like temporary branch, duplicate snapshot id and slow sequential table deletes]
b) single bloom filter is per reference now instead of per content per reference.
c) changed bloom filter key to use snapshot id + metadata file location to handle backward compatibility for global state contents.
d) Moved checkpoint info into IdentifiedResultsRepo
table, instead of one more separate table.
I will polish up a PR bit. But overall logic is ready for review.
cc: @snazy , @keithgchapman
@snazy
Assume this is the identify gc result
cid1, metadata1, expired, branch1 cid1, metadata1, live , branch2 cid1, metadata2, live , branch1 cid1, metadata3, expierd, branch3
now for each content, I will read the table metadata -> read manifest lists -> collect all the manifests path -> explode -> separate live and expired rows -> expired.except(live) -> get expired manifests.
cid1, metadata1, {m1,m2,m3}, expired, branch1 cid1, metadata1, {m1,m2,m3}, live, branch2 cid1, metadata2, {m1,m2,m3,m4}, live, branch3 cid1, metadata3, {m1,m2}, expired, branch3
now for each EXPIRED content, I will read the table metadata -> read manifest lists -> collect all the manifests path -> PRUNE with expired manifests filter -> get GenericManifestFile -> read data files -> return the result.
In Iceberg expire_snaphots one manifest file is read once as it works on only one table metadata. expire contents has to work with N tableMetadata (total number of commits/contents). Hence, I don't want to read the data files of the manifests before finding globally expired manifests.
Hence the two time read.
The another problem with Iceberg interface is to read data files, manifestPath is not enough. It needs GenericManifestFile object. On previous read, I can't store GenericManifestFile object. (bulky object and seriaization fails due to avro schema in that object with null type). So, for expired contents, I am reading table metadata and manifestList twice.
I know it is slow. But I don't have other solution.
I will write a detailed document on expireContents implementation and use it for discussion.
@snazy : I have reworked on the Nessie expire snapshots now.
New logic (also handles if the data files are reused in the manifests)
For each live content, read the table metadata, for its current snapshot, get the live manifestList, live manifests, live data files. [Yes, GC Identify output should contain live contents also] For each expired content, read the table metadata, for its current snapshot, get the expired manifestList, expired manifests, expired data files. expired.except(live) to get final expired files.
Can you please rebase this PR?
@snazy : I have addressed the comments. Please have a look at it again. Thanks.
- gc-base module: a. [PR ready] Remove commitProtection time configuration and use cutoff time itself to protect the commits. b. Introduce a checkpoint information in the
IdentifiedResultsRepo
to solve gc overtime issue. marker rows are written by GC-iceberg module's expiry step. c. Modify base GC algorithm to consider the checkpoints and spliterator based on checkpoint. d. ModifyIdentifiedResultsRepo
contents as per the requirement of GC consumer algorithm. [like add commit hash, metadata location column, isExpired] e. Move some common code toGCUtil
. f. [Merged] Fix some bugs in base algorithm like filter out -1 snapshot id, handle one missed scenario in dropped table single key.- gc-iceberg module: a. Introduce gc-iceberg module b. support stored procedure framework by introducing
NessieIcebergGcSparkCatalog
andNessieIcebergGcSparkSessionCatalog
c. IntroduceIdentifyExpiredContentsProcedure
to call gc-base module identify step and write output to table. d. IntroduceExpireContentsProcedure
to for gc consumer algorithm that implements own expire snapshots and also writes checkpoint marker rows forIdentifiedResultsRepo
.
PR title and probably description should be adopted.
If #4626 causes too much trouble now, I'm okay to do it as an immediate follow-up.
3 comments are pending for this PR.
- Understand and adopt to #4303 instead of using reflog
- improve checkpoint logic (probably need to introduce reference ID and not sure about handling the compatibility yet)
- #4626 (
gc-base
module should not depend on iceberg jars)
I will explore in the same order I mentioned above.
PR title and probably description should be adopted.
Updated.
@snazy: Out of 3 open points. I have handled reverting the checkpoint logic.
I believe the other two [#4626 , #4690] can be handled in the follow-up.
@snazy : Addressed all the comments except JSON one.
The order of the fields in the JSON string representation isn’t guaranteed AFAIK, which can lead to wrong GC behavior.
I am not clear about the "wrong GC behaviour". Do you mean deserialization will fail? I tested once in the cluster when I introduced this JSON serialization and it was working.
Also tried the below properties for ObjectMapper and it didn't make any difference in the JSON string output order. ObjectMapper.configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, true) ObjectMapper.configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true)
@snazy: While I am working on these two open points [https://github.com/projectnessie/nessie/issues/4626 , https://github.com/projectnessie/nessie/issues/4690] on a follow-up PR,
can we please merge this PR if it is ok?
Sorry, but the implementation is still very prone to easily run into out-of-memory situations.
@snazy : Fixed it by writing the expiry results to a table as suggested. Instead of writing to the same identify output table, I am using a new table under the same reference, as the identify result should not have expiry procedure output and the schema is totally different.
Thanks for the effort, but closing in favor of #4991