trino icon indicating copy to clipboard operation
trino copied to clipboard

Improve performance for Equality Delete files in Iceberg connector

Open jasonf20 opened this issue 2 years ago • 33 comments

Description

The goal of this PR is to improve the performance of queries that contain Equality Delete files.

Before this commit equality delete files were re-loaded for every split that needed them. Now, each delete file is loaded once per execution node.

In addition, the equality deletes are stored in a single map (per delete schema) with the data sequence number in which the row was deleted. This allows us to merge rows that were deleted multiple times (common in upsert use cases) into a single entry instead of holding an entry per delete file.

Additional context and related issues

Fixes #18396

Changes in this commit:

  • Create a DeleteManager class to manage the above logic
  • Read every delete file only once
  • Merge deletes into single map
  • Opportunistic delete file load parallelization

Release notes

( ) This is not user-visible or docs only and no release notes are required. ( ) Release notes are required, please propose a release note for me. (x) Release notes are required, with the following suggested text:

# Iceberg
* Improved performance and memory usage when [Equality Delete](https://iceberg.apache.org/spec/#equality-delete-files)  files are used ({issue}`18396`)

jasonf20 avatar Jul 25 '23 08:07 jasonf20

Thank you for your pull request and welcome to the Trino community. We require contributors to sign our Contributor License Agreement, and we don't seem to have you on file. Continue to work with us on the review and improvements in this PR, and submit the signed CLA to [email protected]. Photos, scans, or digitally-signed PDF files are all suitable. Processing may take a few days. The CLA needs to be on file before we merge your changes. For more information, see https://github.com/trinodb/cla

cla-bot[bot] avatar Jul 25 '23 08:07 cla-bot[bot]

Thank you for your pull request and welcome to the Trino community. We require contributors to sign our Contributor License Agreement, and we don't seem to have you on file. Continue to work with us on the review and improvements in this PR, and submit the signed CLA to [email protected]. Photos, scans, or digitally-signed PDF files are all suitable. Processing may take a few days. The CLA needs to be on file before we merge your changes. For more information, see https://github.com/trinodb/cla

cla-bot[bot] avatar Jul 26 '23 08:07 cla-bot[bot]

Thank you for your pull request and welcome to the Trino community. We require contributors to sign our Contributor License Agreement, and we don't seem to have you on file. Continue to work with us on the review and improvements in this PR, and submit the signed CLA to [email protected]. Photos, scans, or digitally-signed PDF files are all suitable. Processing may take a few days. The CLA needs to be on file before we merge your changes. For more information, see https://github.com/trinodb/cla

cla-bot[bot] avatar Jul 26 '23 15:07 cla-bot[bot]

Thank you for your pull request and welcome to the Trino community. We require contributors to sign our Contributor License Agreement, and we don't seem to have you on file. Continue to work with us on the review and improvements in this PR, and submit the signed CLA to [email protected]. Photos, scans, or digitally-signed PDF files are all suitable. Processing may take a few days. The CLA needs to be on file before we merge your changes. For more information, see https://github.com/trinodb/cla

cla-bot[bot] avatar Aug 03 '23 15:08 cla-bot[bot]

@findinpath This PR is the fix for the equality deletes issue we discussed. We have another PR coming that should further improve for situations where there are a lot of deletes (>~200M).

yoniiny avatar Aug 03 '23 16:08 yoniiny

@jasonf20 does your contribution overlap with https://github.com/trinodb/trino/pull/17115 ?

Note that there is in the above mentioned PR scaffolding for testing. Please use it here as well to ensure the validity of your changes.

Read every delete file only once

Is this something we can verify as part of this PR (before & after your change) ? Have a look at https://github.com/trinodb/trino/blob/18b47f0223c7d7cb74fbb8c16f9b3871ad79690b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetadataFileOperations.java

findinpath avatar Aug 04 '23 04:08 findinpath

Hi @findinpath

It does seem like this PR overlaps with #17115. This PR should do the same map compaction that is done in that PR. However, it includes more optimizations on top of that including:

  • Reading delete files only once
    • This adds some complexity since the shared state must eventually be cleaned up. This is done with a WeakReference in this PR.
    • Also the data sequence number needs to be taken into account since we share the deletes map across splits
  • Optimistically loading delete files in parallel (despite reading each file only once, we will allow different splits to load the files in a different order so that multiple files are loaded at once)

I think that it should suffice to merge only one of these PRs. If the other one is merged I will need to rebase and add the above improvements on top of the previous PR.

I have added two tests based on the templates/suggestion I saw in the other PR.

I was able to use the MetadataFileOperations spec as a guideline for adding a test that validates the delete file is loaded only once. Added as a new commit.

jasonf20 avatar Aug 06 '23 09:08 jasonf20

@jasonf20 let's concentrate on landing first the PR https://github.com/trinodb/trino/pull/17115 which comes with less changes. There are a few corner cases that are not fully obvious which need to be covered in detail before landing these changes. Feel free to contribute with suggestions on the above mentioned PR to accelerate its landing.

findinpath avatar Aug 09 '23 21:08 findinpath

@findinpath Happy to assist with the merging of #17115. Is there anything specifically there? Looking at the PR it seems most action items have been handled.

Once that's merged I'll rebase on top of that with the remaining changes.

Keep in mind that as it stands the other PR isn't enough for the use case of a large table with constant upserts to actually work since each split loads all the delete files sequentially.

jasonf20 avatar Aug 10 '23 07:08 jasonf20

Happy to assist with the merging of https://github.com/trinodb/trino/pull/17115. Is there anything specifically there?

We stumbled on a situation related to using nested fields for equality deletes. The PR will be adding proper handling for such situations. It seems that, at the moment, Trino is not capable to read such delete files. In any case, adding support in Trino for dealing with nested fields in equality deletes is not in the scope of the above mentioned PR.

findinpath avatar Aug 10 '23 08:08 findinpath

@jasonf20 https://github.com/trinodb/trino/pull/17115 has been merged. Feel free to continue the work on this PR.

findinpath avatar Sep 26 '23 18:09 findinpath

@findinpath This PR is ready for review

yoniiny avatar Oct 05 '23 18:10 yoniiny

This change makes a lot of sense at a high level. Re-reading the same delete files over and over again for each split does seem costly and something to avoid if possible. However, I see two issues with the approach in the current implementation:

  1. The memory consumed by the delete filters is not tracked. This is the current state of the world and already not ideal, but at least the memory associated with them today is guaranteed to be released once the IcebergPageSource finishes scanning the input split, which means a relatively short lifetime and definite disposal. However, with this change as currently implemented the lifetime increases in duration (potentially significantly) and we lose definite disposal by moving the responsibility to the GC's processing of WeakReferences- which can be a sharp edge and cause problems in production quite easily. Instead, I would propose that that it would be better to share delete filters within the context of a single Task instead of globally from the IcebergPageSourceProvider. This would allow the memory backing them to be accounted for once at the task level, narrow the scope of sharing to the same table scan, and still be allow for deterministic disposal by performing that cleanup as part of task termination.
  2. Regardless of the cache sharing considerations above, sharing the delete filters will only be effective if the workload actually does reuse them. Otherwise, a pathological worst-case workload that causes eviction of entries before they could be reused would not benefit at all from this change and would see some additional overheads due to cache maintenance and synchronization costs. I don't think this is fundamentally a blocker since it should be only slightly worse in the worst case- but you might want to consider how splits relate to delete files within the Iceberg connector (I'm not very familiar with Iceberg's internals) and think about whether there are any strategies that you might need to employ in order to generate / assign splits to tasks in such a way to maximize the reuse rate.

cc: @findepi for a second opinion on the recommendations above.

pettyjamesm avatar Nov 01 '23 19:11 pettyjamesm

Hi @preethiratnam

In SqlTaskManager line 227, you can see that the QueryContext (which holds the TaskContext objects) is already in a WeakReference. Since we're probably talking about putting this data directly in the QueryContext so that it would be shared between Tasks, these objects will already have been in a WeakReference. Even if we put it in the TaskContext, the QueryContext has a reference to all its TaskContext objects, so this will still be the case. Our method has two main advantages as I see it:

  1. Allows state to be reused between queries if applicable
  2. Does not change code outside of the Iceberg plugin

Given this information, does that change your estimation of the potential impact, or what the implementation should be?

jasonf20 avatar Nov 05 '23 16:11 jasonf20

Adding replies inline below:

In SqlTaskManager line 227, you can see that the QueryContext (which holds the TaskContext objects) is already in a WeakReference.

That's true, but the relationship of TaskContext and QueryContext is a little more nuanced. QueryContext must be kept so long as a single TaskContext within the same query is present since all tasks must share the same instance for the same query, but the coordinator to worker protocol only creates tasks directly (not queries) so the hacky approach to make it work is to have each TaskContext retain a strong reference to their QueryContext and make the global QueryContext mapping use weak references to allow expiration once all sub-tasks have been removed and no longer reference it. Managing those references is still prone to leaks, but QueryContext objects themselves are at least not especially large, and there is no notion of "eviction" here. The QueryContext objects must be kept until all TaskContext objects are released from their global mapping.

Since we're probably talking about putting this data directly in the QueryContext so that it would be shared between Tasks, these objects will already have been in a WeakReference. Even if we put it in the TaskContext, the QueryContext has a reference to all its TaskContext objects, so this will still be the case.

That would be one option, although not necessarily required. If you assume that each task generally corresponds to a single table scan and that each scan generally must have the filters present to run, we have a clear notion of the lifecycle at the task level. You could potentially have a shared object at the QueryContext level and use a reference counting scheme or something similar to ensure that disposal of the objects occurred at the appropriate time- but that would obviously be much more complex for little benefit. In either of these two cases though: you would still have a well defined point at which you could explicitly release all references to the cache entries, and you would have a single well defined place to attribute their memory cost (either task or query level).

Our method has two main advantages as I see it:

1. Allows state to be reused between queries if applicable

It's true that the current approach would allow sharing the cache between queries, but that benefit comes with two drawbacks as a direct consequence:

  1. There is no way to track and account for the memory usage of those filters
  2. There is no way to know when individual cache entries are still in use and necessary for in-flight work, and which ones should be cleaned up.
2. Does not change code outside of the Iceberg plugin

While that certainly makes the implementation simpler, it's functionally a big departure from the approach taken elsewhere throughout the engine and introduces operational risks.

Similar patterns do exist within Presto (eg: ParquetMetadataSource / CachingParquetMetadataSource) and when that pattern is used, there's a maximum allowed in-memory size and eviction policy configured at the plugin level.

pettyjamesm avatar Nov 07 '23 19:11 pettyjamesm

I agree with @pettyjamesm's points. I think improving the cost of processing tables with delete rows is great, but the approach here seems to go well beyond that to basically caching data across queries. If we want to go down that path, I don't know why we would stop at just deleted files. Why not cache parquet/orc footers? Or cache small tables in memory? These all sound great, but are pretty expansive.

I suggest we start with a simple task cache (with memory tracking), and later we can talk about adding cross query caching in Trino.

dain avatar Nov 07 '23 23:11 dain

Hi @dain @pettyjamesm,

Based on your feedback, I've implemented two different approaches for managing the lifecycle of DeleteManager. Both approaches utilize the new ConnectorDynamicFilterProvider. This provider allows connectors to extend the DynamicFilters as they wish. I chose to use DynamicFilters since it seems to have a similar meaning, allowing connectors to extend the filters seems reasonable, and re-using this interface means the interface of ConnectorPageSourceProvider.createPageSource does not need to change.

I'm not sure which approach you prefer, but you can compare them by reviewing the first 3 commits together (first approach) or all the commits (second approach).

  1. Event Listener Approach: This method manages the DeleteManager through the query lifecycle using an event listener. Here, the DeleteManager is created at the start of the query and cleared at its conclusion. This approach is event-driven, ensuring that the lifecycle of the DeleteManager is closely synchronized with the query's lifecycle. However, I do not know if the event listener is reliable enough in Trino for this purpose. Will we always get the events?

  2. QueryContext State Map Approach: In this approach, the QueryContext is used to hold and manage the DeleteManager. It involves a state map within the QueryContext for storing necessary state throughout the duration of the query. This method required more changes to the Trino engine, but doesn't rely on the EventListener.

Let me know your thoughts on which approach better aligns with the overall architecture of Trino.

jasonf20 avatar Nov 13 '23 16:11 jasonf20

@pettyjamesm @dain bumping this

yoniiny avatar Nov 27 '23 14:11 yoniiny

Sorry for the delay in getting back to you about this @yoniiny.

First to answer your question about the two approaches:

Unfortunately the query completion event is only triggered on the coordinator node and not on workers- so the event listener approach won't work to initiate cleanup of the cached state.

The QueryContext state map approach is along right lines, although it may not be necessary. If instead we added a close() method to PageSourceProvider / ConnectorPageSourceProvider and updated TableScanOperatorFactory and ScanFilterAndProjectOperatorFactory to invoke it as part of OperatorFactory#noMoreOperators(), we could use the the instance state and lifecycle of IcebergPageSourceProvider instead while keeping this as an internal implementation detail within the Iceberg connector. This implies that the OperatorFactory will become responsible for the memory tracking, which doesn't seem wholly unreasonable to me but would be new behavior (cc @dain for your thoughts on this approach).

Outside of the code level approach, I'm still skeptical that it's preferable to implement this at the query level compared to the task level. It's not so much that I think it's inherently more complicated, but rather that the query level implementation will be more naive and less capable than a task level implementation which corresponds to a given in-progress single table scan. It's possible to implement a query level (or even cross-query) implementation on-top of a task level one using reference counts, but not the other way around.

pettyjamesm avatar Nov 27 '23 19:11 pettyjamesm

@pettyjamesm We pushed an update that changes the DeleteManager to the Task level instead of Query Level, this simplifies the code and makes it so it's created when the OperationFactory is created. Let us know what you think of the latest revision.

I still have the intermediate commits, but I can discard them when we are ready to merge. If you review the entire PR you should see just the latest revision.

jasonf20 avatar Nov 28 '23 11:11 jasonf20

Hi @pettyjamesm I've pushed a clean version of the PR with the changes we discussed on slack.

jasonf20 avatar Dec 05 '23 10:12 jasonf20

This pull request has gone a while without any activity. Tagging the Trino developer relations team: @bitsondatadev @colebow @mosabua

github-actions[bot] avatar Jan 26 '24 17:01 github-actions[bot]

👋 @jasonf20 thank you for all your work on this PR. Could you potentially rebase and continue the discussion with @pettyjamesm and others to move forward?

mosabua avatar Jan 26 '24 17:01 mosabua

Hi @mosabua I'm happy to work on this, though I am a little blocked at the moment by an architectural decision. @pettyjamesm let me know if you have an answer to my question in this comment: https://github.com/trinodb/trino/pull/18397#discussion_r1440397542. If you want to discuss this further feel free to ping me in Slack as well.

Once this is decided it will affect a lot of things so it's a blocker for me to progress.

jasonf20 avatar Jan 29 '24 09:01 jasonf20

Thanks for chiming in @jasonf20 .. maybe @findinpath @findepi @electrum or @alexjo2144 can help @pettyjamesm

Also fyi @brandylove

mosabua avatar Jan 29 '24 17:01 mosabua

@dain Thanks for the comment.

@pettyjamesm I have updated the code based on your comments and create a ProviderFactory class. Please review the current revision and let me know if there are any other issues. Thanks!

jasonf20 avatar Feb 15 '24 21:02 jasonf20

@jasonf20 IIUC this PR is caching deletion files. However, it seems that these can be cached per split rather than per query. In that case ConnectorPageSourceProviderFactory seems like too narrow scope.

sopel39 avatar Feb 16 '24 10:02 sopel39

@jasonf20 IIUC this PR is caching deletion files. However, it seems that these can be cached per split rather than per query. In that case ConnectorPageSourceProviderFactory seems like too narrow scope.

The delete files are already read only once per split. The issue is that caching per task/query is required otherwise the delete files are read in each split leading to unusable performance. This approach was designed with @pettyjamesm as the cleanest way to cache at the right level.

jasonf20 avatar Feb 16 '24 21:02 jasonf20

@jasonf20 Would it make sense to cache deletion files across queries too?

sopel39 avatar Feb 19 '24 08:02 sopel39

@sopel39

It could be useful for performance when querying the same tables but the same can be said for data files. Perhaps a generic cache of this sort would be implemented in Trino at some point but it's probably a much more involved task, considering memory usage and such.

For this PR specifically it would be interesting to consider caching at the query level (this PR does it at the Task Level). But for most queries caching at the task level should produce the same performance. We discussed query level caches and decided it can probably be built on top of this Task Level caching with reference counting or something like that in the future if needed. So the first step is to solve this at this level which should have the largest impact and potentially expand the caching scope later.

The main motivation here is to make queries on tables with equality deletes complete in a reasonable amount of time. The current implementation doesn't really work with more than a couple equality delete files

jasonf20 avatar Feb 19 '24 16:02 jasonf20