[Improvement]: The optimizer adds a cache of eq delete files to reduce repeated IO cost of eq delete files
Search before asking
- [X] I have searched in the issues and found no similar issues.
What would you like to be improved?
For large tables written by Flink, each commit will submit an EQ DELETE file associated with all previous data files. Most of the generated optimize tasks will repeatedly read this EQ DELETE file, causing duplicate IO cost.
How should we improve?
Each JVM(taskmanager, executor) in the Optimizer generates a Cache to cache the EQ DELETE File.
Are you willing to submit PR?
- [x] Yes I am willing to submit a PR!
Subtasks
No response
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
Hi, it is a good idea to cache the delete file data. However, before proceeding with the actual implementation, we may need to clarify some details like:
- Do we only cache data for equality-deleted files?
- How to limit the size of the cache?
- What kind of cache eviction strategy to use?
- When allocating tasks in AMS, is it necessary to consider cache hits?
IMO,
Do we only cache data for equality-deleted files?
Yes, the position delete files only match limited insert files, so it seems to be unnecessary to cache them for now.
How to limit the size of the cache?
If we cannot fully cache a delete file, the cache will be ineffective. To better protect memory and cache as many files as possible, we may only want to cache delete files that are relatively small in size. Additionally, for delete files that are too large, they are now filtered during reading using the bloom filter of insert files, and they are not suitable for caching either. Besides, we should add a configuration to control the maximum size of the cache, preferably in bytes.
What kind of cache eviction strategy to use?
LRU seems to be good enough.
When allocating tasks in AMS, is it necessary to consider cache hits?
Considering cache hits during scheduling can better utilize the cache. However, initially, we can overlook this and consider adding this optimization later.
Thanks for your reminder on the details.
The idea is from Iceberg community, Introduced delete file cache in PR #8755 for spark executors.
IMO,
Do we only cache data for equality-deleted files?
Yes, the position delete files only match limited insert files, so it seems to be unnecessary to cache them for now.
Yes.
How to limit the size of the cache?
If we cannot fully cache a delete file, the cache will be ineffective. To better protect memory and cache as many files as possible, we may only want to cache delete files that are relatively small in size. Additionally, for delete files that are too large, they are now filtered during reading using the bloom filter of insert files, and they are not suitable for caching either. Besides, we should add a configuration to control the maximum size of the cache, preferably in bytes.
Iceberg community imported spark.sql.iceberg.executor-cache.max-total-size to limit memory usage.
The cache is mainly on the Optimizer side, we can add a parameter, like -msz, which requires an upper limit on memory usage to stabilize the operation of the optimizer.
For the part using the bloom filter, we might be able to skip caching.
What kind of cache eviction strategy to use?
LRU seems to be good enough.
How about LRU + active expiration (when optimizer table changed)?
When allocating tasks in AMS, is it necessary to consider cache hits?
Considering cache hits during scheduling can better utilize the cache. However, initially, we can overlook this and consider adding this optimization later.
I also think configuring it on the optimizer side at first is enough.
@zhongqishang Thank you for providing detailed information on specific improvements to the Iceberg project. I'm delighted to see that this improvement has been merged into the Iceberg Spark connector.
How about LRU + active expiration (when optimizer table changed)?
I understand your point that if the executor thread switches to a new process, it can evict the existing cache. This seems like a good strategy, but it requires AMS to always schedule tasks for a process together, which is currently the case. However, it seems difficult to determine the timing of switching to a new process in situations like multiple threads sharing a JVM environment.
This feature would quite benefit for Spark because cache could be shared among tasks in a single executor(which would not previously), and tasks here are belonged to one task type.
This situation is not guaranteed in Amoro, this feature may have some influences on process and task scheduler
@majin1102 Thanks for your reply.
I just want to cover some tables with many data files. The newly added eq files will all be associated. The eq delete in the optimizer will read the number of tasks repeatedly because Optimizer concurrency will be much smaller than the number of data files. If we cache it, we will speed up the merge and reduce the IO cost of the eq file.
There will be some adverse effects on tables that are not in this scenario, because cache will involve the materialization of eq delete data. Like @zhoujinsong said, we can consider cache hits during scheduling to avoid unnecessary effect.
This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.
This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale'