iceberg
iceberg copied to clipboard
core: Provide mechanism to cache manifest file content
This is a draft PR for ManifestCache implementation.
The ManifestCache interface closely follow com.github.benmanes.caffeine.cache.Cache interface, with addition of specifying maxContentLength(). If stream length is longer than maxContentLength(), AvroIO will skip caching it to avoid memory pressure. An example of implementation, CaffeineManifestCache, can be seen in TestAvroFileSplit.java.
I will tidy this up and add documentations as we go. Looking forward for any feedback.
Closes #4508
485e2ab Switch ManifestCache class to use Guava Cache instead. This is easier to do in iceberg-api since we can simply include the Guava cache classes into the shadow jar of iceberg-bundled-guava.
Switching to use more Guava classes is probably not a good idea.
@rizaon, do you have use cases where this has helped? If so, what were they? This adds quite a bit of complexity and memory overhead that we purposely avoided up to now so I want to make sure it is worth the change.
Have you considered adding a caching FileIO instance? That could be used for more use cases than just manifest files and job planning. For example, a FileIO read-through cache could cache delete files that might be reused for tasks. This could be configured by file path, making it easy to determine what to cache. Plus, we could use more options than just in-memory, like a distributed cache or local disk.
Hi @rdblue, thank you for your feedback.
We found a slow query compilation issue against the Iceberg table in our recent Apache Impala build. Impala uses Iceberg's HiveCatalog and HadoopFileIO instance with an S3A input stream to access data from S3. We did a full 10 TB TPC-DS benchmark and found that query compilation can go for several seconds, while it used to be less than a second with native hive tables. This slowness in single query compilation is due to the requirement to call planFiles several times, even for scan nodes targetting the same table. We also see several socket read operations that spend hundreds of milliseconds during planFiles, presumably due to S3A HTTP HEAD request overhead and backward seek overhead (issue #4508). This is especially hurt fast-running queries.
We tried this caching solution and it help speed up Impala query compilation almost 5x faster compared to without on Iceberg tables. Our original solution, however, is to put a Caffeine cache as a singleton in AvroIO.java. I thought it is better to supply the cache from outside.
I have not considered the solution of adding a caching FileIO instance. I'm pretty new to the Iceberg codebase but interested to follow up on that if it can yield a better integration. Will it require a new class of Catalog/Table as well, or can we improve on the existing HiveCatalog & HadoopFileIO?
A relevant Impala's JIRA is here: https://issues.apache.org/jira/browse/IMPALA-11171
@rizaon, caching in the FileIO layer would be much more general. You could do things like detect that the file size is less than some threshold and cache it in memory, or detect file names under metadata and cache those. I highly recommend looking into the FileIO API!
c032b7a implement caching as a new FileIO class, CachingHadoopFileIO. A new Tables class, CachingHadoopTables, is also added to assist with testing.
We tried to avoid lazyStats() call as much as possible by checking the cache first before checking for stream length. This comes with a risk of keeping stale data in memory when the actual file is already deleted, as shown in CachingHiveTableTest.testMissingMetadataWontCauseHang(). This is probably fine, given the immutable nature of iceberg metadata. Otherwise, application should invalidate cache entry first to force re-read / lazyStats() call.
Hello @rdblue. This PR is ready for review.
Please let me know if there is any new feedback or request. I'm happy to follow up.
@rizaon, there are a couple of PRs that should help you with this. #4608 adds IOUtil.readFully and IOUtil.readRemaining that you can use to fill buffers from an input stream. I've also opened #4623 to add the ByteBufferInputStream classes I was talking about. Those should allow you to easily split the data across several small allocations and wrap them in an InputStream implementation.
Got it, thank you. Will rebase and update this PR once they are merged.
@rizaon, the other PRs are in so you should be able to take advantage of IOUtil.readFully and the ByteBufferInputStream implementations. I hope that helps!
@findepi, @alexjo2144, FYI. This could be an option for Trino performance.
@rizaon and @rdblue I made a comment in one of the replies above but wanted to ask about potentially bad behaviors around reading metadata tables. Since in those cases the data files are really the metadata files, the caching would trigger. I feel like this could contribute to some interesting problems with high churn in those cases both in the driver and potentially the worker side.
It might be better just to wrap on access where we read metadata files in MetadataFiles.java. That would also avoid guessing at the pathing for metadata files.
Hi @danielcweeks @rdblue , can I get a follow up review on this PR please?
I think we have not fully agree on how to decide which file to cache (regex vs specific callsite). I decided to go with regex approach in dec17e2 because there are many callsites in Iceberg core that open metadata files. Please let me know how I can improve this PR.
CachingFileIO caches the data accessed for the first time. For the mpp database, it affects the corresponding time of the first access. Can snapshot changes be notified to CachingFileIO? Clean cached data, metadata can be cached for a long time. All metadata is cached in dremio and updated regularly. Snapshot changes, update time is not timely, avoid reading metadata and cause query delay
it affects the corresponding time of the first access.
if you are talking about any accessed timestamps in filesystems, know the cloud stores don't have them. and s3 objects creation times are when a multipart upload was initiated, not completed. not relevant for smaller files, but for many mb/gb ones, it is
Hi @danielcweeks , given the immutability of snapshot and metadata, does the "potentially bad behaviors around reading metadata tables" still a concern? Immutability should make consistency easy to manage (can just evict cache entry when space is needed without having to worry about invalidating on update, etc).
About "high churn" in terms of high eviction rates, I'd like to understand what scenario can lead to that issue. Within a single table snapshot, what is the largest amount of manifest files and their individual size have been observed before?
One way to avoid churn / pressure in cache is to detect what is the reason of most recent cache eviction. Say, if CachingFileIO tend to evict most entries in short time due to total memory size constraint, CachingFileIO should consider just bypassing cache for some period or until some entries evicted due to expiration rather than size limit.
We can do that with caffeine cache by implementing RemovalListener https://www.javadoc.io/doc/com.github.ben-manes.caffeine/caffeine/2.3.1/com/github/benmanes/caffeine/cache/RemovalListener.html
Another way is to impose high watermark limit on cache total weight. Thus, bypassing the cache once this high watermark limit exceeded.
Hey @rizaon, some recent changes actually exposed FileIO properties, which I believe would simplify how we can cache manifest files.
Where we read manifest files, we now have the file legnth and properties from the FileIO. It seems like we could just add a simple cache to the ManifestFiles and allow caching configuration via FileIO::properties().
I feel like that would really simplify this and avoid depending on things like regex+pathing to isolate which files to cache.
Hi @danielcweeks , thank you for the update. I will check PR #5207 and study how we can utilize it.
So with that PR, am I correct that we should implement the caching in ManifestFiles.java instead of creating a new CachingFileIO.java as this PR did?
@rizaon because we want to isolate the caching to metadata files only, I think MetadataFiles.java would be a good place to start. We might want some utilities so if we find we want to cache FileIO elsewhere, we can reuse the same approach, but this seems like a full FileIO implementation isn't necessary (and introduces other problems).
@danielcweeks 4d4c4fa implement the manifest file cache in ManifestFiles.java. Since ManifestFiles is stateless and we might deal with multiple FileIO, the cache is implemented as Cache<FileIO, ContentCache> (nested cache). It expire a FileIO's ContentCache after 1 hour of inactivity or dropCache(FileIO) is called.
337bfe8 is a rebase + spotlessApply of 4d4c4fa
Hello @danielcweeks, can I get a new feedback from my latest rebase please?
Core test failed at HadoopFileIOTest > testHadoopFileIOKryoSerialization(). Should I do rebase to pick up the fix from #5437 ?
bb737ab is a rebase + squash of all review so far.
15d67b3 is a fix for kyro serialization failure in testHadoopFileIOKryoSerialization().
Hey @rizaon I think this is getting real close now (thanks for sticking with us on this). Now that we've gotten this far, I see there's a little more we can do to consolidate the logic in ContentCache. For example, if you pull the property configuration into ContentCache constructor, we can remove a lot from ManifestFiles. For example:
public ContentCache(FileIO fileio) {
this.fileio = fileio;
long expireAfterAccessMs = PropertyUtil.propertyAsLong(
fileio.properties(),
CatalogProperties.IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS,
CatalogProperties.IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT);
long maxTotalBytes = PropertyUtil.propertyAsLong(
fileio.properties(),
CatalogProperties.IO_MANIFEST_CACHE_MAX_TOTAL_BYTES,
CatalogProperties.IO_MANIFEST_CACHE_MAX_TOTAL_BYTES_DEFAULT);
long maxContentLength = PropertyUtil.propertyAsLong(
fileio.properties(),
CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH,
CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH_DEFAULT);
ValidationException.check(expireAfterAccessMs >= 0, "expireAfterAccessMs is less than 0");
ValidationException.check(maxTotalBytes > 0, "maxTotalBytes is equal or less than 0");
ValidationException.check(maxContentLength > 0, "maxContentLength is equal or less than 0");
this.expireAfterAccessMs = expireAfterAccessMs;
this.maxTotalBytes = maxTotalBytes;
this.maxContentLength = maxContentLength;
You can also pull CONTENT_CACHES, contentCache(FileIO io), and dropCache(FileIO fileIO) into ContentCache as well.
I played around with moving more of the logic into ContentCache and was able to move everything (minus the newInputFile) logic into ContentCache, which I feel is a cleaner separation of concerns.
Hi @danielcweeks that looks like a nice cleanup. But I have 2 concerns:
- By pushing the config verification down to
ContentCacheconstructor, that will makeContentCacheless generic and more specific towards manifest caching use. Is that OK? - If we keep
fileioas a class member ofContentCache, that becomes a strong reference tofileio. Thus, theweakKeys()inCONTENT_CACHESis not useful anymore because there is always a strong reference to thatFileIOkey living inContentCacheobject (the value)?
Hi @danielcweeks that looks like a nice cleanup. But I have 2 concerns:
- By pushing the config verification down to
ContentCacheconstructor, that will makeContentCacheless generic and more specific towards manifest caching use. Is that OK?- If we keep
fileioas a class member ofContentCache, that becomes a strong reference tofileio. Thus, theweakKeys()inCONTENT_CACHESis not useful anymore because there is always a strong reference to thatFileIOkey living inContentCacheobject (the value)?
Good points. Let me take one more pass (I was just hoping to consolidate more of the logic in ContentCache, but you're probably right that it makes more sense to keep the manifest settings with ManifestFiles).
In the last github check runs, there was a checkstye issue and exception handling when given FileIO does not implement properties() method.
I have rebased this PR and add ba7329d to fix this issue.
@rizaon Thanks for your contribution here and keeping this updated! I'm excited to see how this works out.