nessie icon indicating copy to clipboard operation
nessie copied to clipboard

Nessie GC implementation

Open ajantha-bhat opened this issue 2 years ago • 17 comments

  1. Changes in gc-base module:
  • Remove commitProtection time configuration and use cutoff time itself to protect the commits.
  • Modify IdentifiedResultsRepo contents as per the requirement of GC consumer algorithm. [like add commit hash, metadata location column, isExpired]
  • Replace ContentBloomFilter with BloomFilter of ContentFunnel
  • Bloom filter is now per task instead of per table in a task.
  • Make deadReferenceCutOffTimeStamp a mandatory config (to avoid fallback to live config)
  • Split IdentifyContentsPerExecutor into separate classes for live and expired computations.
  • Refactor some code for better maintainability.
  1. Changes ingc-iceberg module:
  • Introduce gc-iceberg module
  • support stored procedure framework by introducing NessieIcebergGcSparkCatalog and NessieIcebergGcSparkSessionCatalog
  • Introduce IdentifyExpiredContentsProcedure to call gc-base module identify step and write output to table.
  • Introduce ExpireContentsProcedure to for gc consumer algorithm that implements own expire snapshots.

ajantha-bhat avatar Apr 07 '22 17:04 ajantha-bhat

I doubt the proposed approach will work, added a bunch of comments.

There are also serious concerns about time and space complexity, which still stand for quite a while (see original gc-base PR). GC as it is implemented consumes way too much heap, can only let Iceberg expire one table at a time (too slow) and I'm quite unsure about whether the proposed checkpoint-mechanism works.

@snazy : Thanks for the review. I have replied to most of it.

Few points I will rework based on your suggestion (like keeping one output table, no need of expiry branch and use other catalog, make calling expire snapshots distributed or multi threaded).

Check point mechanism works, I have shared the document and PR has test cases.

Regarding overall time complexity, I tested 1000 commits in one reference on a spark cluster. One reference it took around 4 seconds to indentify the expired snapshots (this is with inmemory nessie backend store with local quarkus server).

I don't see other ways of doing GC. If you have a better solution, I am really happy to discuss.

Also, I think it is hard to review overall PR. Hence, I am splitting up these into sub-PRs this week. Each topic we can discuss in depth on the individual PRs.

ajantha-bhat avatar May 09 '22 13:05 ajantha-bhat

Codecov Report

Merging #3885 (f9781d9) into main (da420cf) will decrease coverage by 1.86%. The diff coverage is 86.92%.

@@             Coverage Diff              @@
##               main    #3885      +/-   ##
============================================
- Coverage     86.54%   84.68%   -1.87%     
+ Complexity     3652     3581      -71     
============================================
  Files           416      428      +12     
  Lines         16997    17250     +253     
  Branches       1456     1475      +19     
============================================
- Hits          14710    14608     -102     
- Misses         1785     2140     +355     
  Partials        502      502              
Flag Coverage Δ
java 84.90% <86.92%> (-2.08%) :arrow_down:
javascript 81.98% <ø> (ø)
python 82.86% <ø> (ø)

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...ectnessie/gc/base/IdentifyContentsPerExecutor.java 91.66% <ø> (+1.66%) :arrow_up:
...gc/iceberg/NessieIcebergGcSparkSessionCatalog.java 0.00% <0.00%> (ø)
.../main/java/org/projectnessie/gc/base/GCParams.java 43.75% <37.50%> (-2.92%) :arrow_down:
...g/projectnessie/gc/iceberg/ComputeAllFilesUDF.java 66.66% <66.66%> (ø)
.../org/projectnessie/gc/iceberg/GCProcedureUtil.java 66.66% <66.66%> (ø)
...c/main/java/org/apache/iceberg/GCMetadataUtil.java 77.41% <77.41%> (ø)
...ectnessie/gc/base/DistributedIdentifyContents.java 86.48% <81.25%> (-0.48%) :arrow_down:
...rc/main/java/org/projectnessie/gc/base/GCUtil.java 56.52% <84.21%> (+1.75%) :arrow_up:
...ie/gc/base/IdentifyExpiredContentsPerExecutor.java 85.07% <85.07%> (ø)
...e/gc/iceberg/IdentifyExpiredContentsProcedure.java 86.53% <86.53%> (ø)
... and 52 more

Continue to review full report at Codecov.

Legend - Click here to learn more Δ = absolute <relative> (impact), ø = not affected, ? = missing data Powered by Codecov. Last update da420cf...f9781d9. Read the comment docs.

codecov[bot] avatar May 10 '22 12:05 codecov[bot]

Tested the expire_snapshots of a dummy metadata that doesn't have some partition spec in its metadata for the spec id mentioned in some manifest list. It goes for NPE as partition summaries are collected for ALL_MANIFEST table (it uses schema and spec), it is not required to collect but Iceberg uses some common code :(

Also, If two commits allocated same snapshot id very very rare scenario), In Iceberg (without branching), I raised a PR to fix it. But with Nessie, we cannot know as the ALTER TABLE operation produces same snapshot id but different metadata. At Nessie side, I cannot know whether it is duplicate snapshot id or alter table operations.

@snazy, @keithgchapman : I cannot go ahead without a global state or sudo global state.

ajantha-bhat avatar May 13 '22 06:05 ajantha-bhat

I have reworked on this PR. a) Implemented own distributive (spark job) expire contents logic [ExpireContentsProcedure.java] that invalidates lot of previous comments (like temporary branch, duplicate snapshot id and slow sequential table deletes] b) single bloom filter is per reference now instead of per content per reference. c) changed bloom filter key to use snapshot id + metadata file location to handle backward compatibility for global state contents. d) Moved checkpoint info into IdentifiedResultsRepo table, instead of one more separate table.

I will polish up a PR bit. But overall logic is ready for review.

cc: @snazy , @keithgchapman

ajantha-bhat avatar Jun 13 '22 16:06 ajantha-bhat

@snazy

Assume this is the identify gc result

cid1, metadata1, expired, branch1 cid1, metadata1, live , branch2 cid1, metadata2, live , branch1 cid1, metadata3, expierd, branch3

now for each content, I will read the table metadata -> read manifest lists -> collect all the manifests path -> explode -> separate live and expired rows -> expired.except(live) -> get expired manifests.

cid1, metadata1, {m1,m2,m3}, expired, branch1 cid1, metadata1, {m1,m2,m3}, live, branch2 cid1, metadata2, {m1,m2,m3,m4}, live, branch3 cid1, metadata3, {m1,m2}, expired, branch3

now for each EXPIRED content, I will read the table metadata -> read manifest lists -> collect all the manifests path -> PRUNE with expired manifests filter -> get GenericManifestFile -> read data files -> return the result.

In Iceberg expire_snaphots one manifest file is read once as it works on only one table metadata. expire contents has to work with N tableMetadata (total number of commits/contents). Hence, I don't want to read the data files of the manifests before finding globally expired manifests.

Hence the two time read.

The another problem with Iceberg interface is to read data files, manifestPath is not enough. It needs GenericManifestFile object. On previous read, I can't store GenericManifestFile object. (bulky object and seriaization fails due to avro schema in that object with null type). So, for expired contents, I am reading table metadata and manifestList twice.

I know it is slow. But I don't have other solution.

ajantha-bhat avatar Jun 22 '22 07:06 ajantha-bhat

I will write a detailed document on expireContents implementation and use it for discussion.

ajantha-bhat avatar Jun 22 '22 08:06 ajantha-bhat

@snazy : I have reworked on the Nessie expire snapshots now.

New logic (also handles if the data files are reused in the manifests)

For each live content, read the table metadata, for its current snapshot, get the live manifestList, live manifests, live data files. [Yes, GC Identify output should contain live contents also] For each expired content, read the table metadata, for its current snapshot, get the expired manifestList, expired manifests, expired data files. expired.except(live) to get final expired files.

ajantha-bhat avatar Jun 27 '22 16:06 ajantha-bhat

Can you please rebase this PR?

snazy avatar Jul 01 '22 07:07 snazy

@snazy : I have addressed the comments. Please have a look at it again. Thanks.

ajantha-bhat avatar Jul 04 '22 15:07 ajantha-bhat

  1. gc-base module: a. [PR ready] Remove commitProtection time configuration and use cutoff time itself to protect the commits. b. Introduce a checkpoint information in the IdentifiedResultsRepo to solve gc overtime issue. marker rows are written by GC-iceberg module's expiry step. c. Modify base GC algorithm to consider the checkpoints and spliterator based on checkpoint. d. Modify IdentifiedResultsRepo contents as per the requirement of GC consumer algorithm. [like add commit hash, metadata location column, isExpired] e. Move some common code to GCUtil. f. [Merged] Fix some bugs in base algorithm like filter out -1 snapshot id, handle one missed scenario in dropped table single key.
  2. gc-iceberg module: a. Introduce gc-iceberg module b. support stored procedure framework by introducing NessieIcebergGcSparkCatalog and NessieIcebergGcSparkSessionCatalog c. Introduce IdentifyExpiredContentsProcedure to call gc-base module identify step and write output to table. d. Introduce ExpireContentsProcedure to for gc consumer algorithm that implements own expire snapshots and also writes checkpoint marker rows for IdentifiedResultsRepo.

PR title and probably description should be adopted.

snazy avatar Jul 05 '22 12:07 snazy

If #4626 causes too much trouble now, I'm okay to do it as an immediate follow-up.

snazy avatar Jul 08 '22 10:07 snazy

3 comments are pending for this PR.

  1. Understand and adopt to #4303 instead of using reflog
  2. improve checkpoint logic (probably need to introduce reference ID and not sure about handling the compatibility yet)
  3. #4626 (gc-base module should not depend on iceberg jars)

I will explore in the same order I mentioned above.

ajantha-bhat avatar Jul 11 '22 04:07 ajantha-bhat

PR title and probably description should be adopted.

Updated.

ajantha-bhat avatar Jul 11 '22 12:07 ajantha-bhat

@snazy: Out of 3 open points. I have handled reverting the checkpoint logic.

I believe the other two [#4626 , #4690] can be handled in the follow-up.

ajantha-bhat avatar Jul 13 '22 07:07 ajantha-bhat

@snazy : Addressed all the comments except JSON one.

The order of the fields in the JSON string representation isn’t guaranteed AFAIK, which can lead to wrong GC behavior.

I am not clear about the "wrong GC behaviour". Do you mean deserialization will fail? I tested once in the cluster when I introduced this JSON serialization and it was working.

Also tried the below properties for ObjectMapper and it didn't make any difference in the JSON string output order. ObjectMapper.configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, true) ObjectMapper.configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true)

ajantha-bhat avatar Jul 14 '22 13:07 ajantha-bhat

@snazy: While I am working on these two open points [https://github.com/projectnessie/nessie/issues/4626 , https://github.com/projectnessie/nessie/issues/4690] on a follow-up PR,

can we please merge this PR if it is ok?

ajantha-bhat avatar Jul 20 '22 13:07 ajantha-bhat

Sorry, but the implementation is still very prone to easily run into out-of-memory situations.

@snazy : Fixed it by writing the expiry results to a table as suggested. Instead of writing to the same identify output table, I am using a new table under the same reference, as the identify result should not have expiry procedure output and the schema is totally different.

ajantha-bhat avatar Jul 26 '22 15:07 ajantha-bhat

Thanks for the effort, but closing in favor of #4991

snazy avatar Sep 10 '22 08:09 snazy