iceberg icon indicating copy to clipboard operation
iceberg copied to clipboard

core: Provide mechanism to cache manifest file content

Open rizaon opened this issue 3 years ago • 28 comments

This is a draft PR for ManifestCache implementation.

The ManifestCache interface closely follow com.github.benmanes.caffeine.cache.Cache interface, with addition of specifying maxContentLength(). If stream length is longer than maxContentLength(), AvroIO will skip caching it to avoid memory pressure. An example of implementation, CaffeineManifestCache, can be seen in TestAvroFileSplit.java.

I will tidy this up and add documentations as we go. Looking forward for any feedback.

Closes #4508

rizaon avatar Apr 06 '22 17:04 rizaon

485e2ab Switch ManifestCache class to use Guava Cache instead. This is easier to do in iceberg-api since we can simply include the Guava cache classes into the shadow jar of iceberg-bundled-guava.

rizaon avatar Apr 07 '22 19:04 rizaon

Switching to use more Guava classes is probably not a good idea.

@rizaon, do you have use cases where this has helped? If so, what were they? This adds quite a bit of complexity and memory overhead that we purposely avoided up to now so I want to make sure it is worth the change.

Have you considered adding a caching FileIO instance? That could be used for more use cases than just manifest files and job planning. For example, a FileIO read-through cache could cache delete files that might be reused for tasks. This could be configured by file path, making it easy to determine what to cache. Plus, we could use more options than just in-memory, like a distributed cache or local disk.

rdblue avatar Apr 10 '22 21:04 rdblue

Hi @rdblue, thank you for your feedback.

We found a slow query compilation issue against the Iceberg table in our recent Apache Impala build. Impala uses Iceberg's HiveCatalog and HadoopFileIO instance with an S3A input stream to access data from S3. We did a full 10 TB TPC-DS benchmark and found that query compilation can go for several seconds, while it used to be less than a second with native hive tables. This slowness in single query compilation is due to the requirement to call planFiles several times, even for scan nodes targetting the same table. We also see several socket read operations that spend hundreds of milliseconds during planFiles, presumably due to S3A HTTP HEAD request overhead and backward seek overhead (issue #4508). This is especially hurt fast-running queries.

We tried this caching solution and it help speed up Impala query compilation almost 5x faster compared to without on Iceberg tables. Our original solution, however, is to put a Caffeine cache as a singleton in AvroIO.java. I thought it is better to supply the cache from outside.

I have not considered the solution of adding a caching FileIO instance. I'm pretty new to the Iceberg codebase but interested to follow up on that if it can yield a better integration. Will it require a new class of Catalog/Table as well, or can we improve on the existing HiveCatalog & HadoopFileIO?

rizaon avatar Apr 11 '22 18:04 rizaon

A relevant Impala's JIRA is here: https://issues.apache.org/jira/browse/IMPALA-11171

rizaon avatar Apr 11 '22 18:04 rizaon

@rizaon, caching in the FileIO layer would be much more general. You could do things like detect that the file size is less than some threshold and cache it in memory, or detect file names under metadata and cache those. I highly recommend looking into the FileIO API!

rdblue avatar Apr 11 '22 18:04 rdblue

c032b7a implement caching as a new FileIO class, CachingHadoopFileIO. A new Tables class, CachingHadoopTables, is also added to assist with testing.

We tried to avoid lazyStats() call as much as possible by checking the cache first before checking for stream length. This comes with a risk of keeping stale data in memory when the actual file is already deleted, as shown in CachingHiveTableTest.testMissingMetadataWontCauseHang(). This is probably fine, given the immutable nature of iceberg metadata. Otherwise, application should invalidate cache entry first to force re-read / lazyStats() call.

rizaon avatar Apr 14 '22 23:04 rizaon

Hello @rdblue. This PR is ready for review.

Please let me know if there is any new feedback or request. I'm happy to follow up.

rizaon avatar Apr 18 '22 15:04 rizaon

@rizaon, there are a couple of PRs that should help you with this. #4608 adds IOUtil.readFully and IOUtil.readRemaining that you can use to fill buffers from an input stream. I've also opened #4623 to add the ByteBufferInputStream classes I was talking about. Those should allow you to easily split the data across several small allocations and wrap them in an InputStream implementation.

rdblue avatar Apr 24 '22 17:04 rdblue

Got it, thank you. Will rebase and update this PR once they are merged.

rizaon avatar Apr 26 '22 04:04 rizaon

@rizaon, the other PRs are in so you should be able to take advantage of IOUtil.readFully and the ByteBufferInputStream implementations. I hope that helps!

rdblue avatar Apr 28 '22 20:04 rdblue

@findepi, @alexjo2144, FYI. This could be an option for Trino performance.

rdblue avatar Apr 28 '22 20:04 rdblue

@rizaon and @rdblue I made a comment in one of the replies above but wanted to ask about potentially bad behaviors around reading metadata tables. Since in those cases the data files are really the metadata files, the caching would trigger. I feel like this could contribute to some interesting problems with high churn in those cases both in the driver and potentially the worker side.

It might be better just to wrap on access where we read metadata files in MetadataFiles.java. That would also avoid guessing at the pathing for metadata files.

danielcweeks avatar Apr 30 '22 00:04 danielcweeks

Hi @danielcweeks @rdblue , can I get a follow up review on this PR please?

I think we have not fully agree on how to decide which file to cache (regex vs specific callsite). I decided to go with regex approach in dec17e2 because there are many callsites in Iceberg core that open metadata files. Please let me know how I can improve this PR.

rizaon avatar May 11 '22 14:05 rizaon

CachingFileIO caches the data accessed for the first time. For the mpp database, it affects the corresponding time of the first access. Can snapshot changes be notified to CachingFileIO? Clean cached data, metadata can be cached for a long time. All metadata is cached in dremio and updated regularly. Snapshot changes, update time is not timely, avoid reading metadata and cause query delay

melin avatar May 19 '22 10:05 melin

it affects the corresponding time of the first access.

if you are talking about any accessed timestamps in filesystems, know the cloud stores don't have them. and s3 objects creation times are when a multipart upload was initiated, not completed. not relevant for smaller files, but for many mb/gb ones, it is

steveloughran avatar May 24 '22 08:05 steveloughran

Hi @danielcweeks , given the immutability of snapshot and metadata, does the "potentially bad behaviors around reading metadata tables" still a concern? Immutability should make consistency easy to manage (can just evict cache entry when space is needed without having to worry about invalidating on update, etc).

About "high churn" in terms of high eviction rates, I'd like to understand what scenario can lead to that issue. Within a single table snapshot, what is the largest amount of manifest files and their individual size have been observed before?

rizaon avatar Jun 02 '22 15:06 rizaon

One way to avoid churn / pressure in cache is to detect what is the reason of most recent cache eviction. Say, if CachingFileIO tend to evict most entries in short time due to total memory size constraint, CachingFileIO should consider just bypassing cache for some period or until some entries evicted due to expiration rather than size limit.

We can do that with caffeine cache by implementing RemovalListener https://www.javadoc.io/doc/com.github.ben-manes.caffeine/caffeine/2.3.1/com/github/benmanes/caffeine/cache/RemovalListener.html

Another way is to impose high watermark limit on cache total weight. Thus, bypassing the cache once this high watermark limit exceeded.

rizaon avatar Jun 07 '22 20:06 rizaon

Hey @rizaon, some recent changes actually exposed FileIO properties, which I believe would simplify how we can cache manifest files.

Where we read manifest files, we now have the file legnth and properties from the FileIO. It seems like we could just add a simple cache to the ManifestFiles and allow caching configuration via FileIO::properties().

I feel like that would really simplify this and avoid depending on things like regex+pathing to isolate which files to cache.

danielcweeks avatar Jul 06 '22 23:07 danielcweeks

Hi @danielcweeks , thank you for the update. I will check PR #5207 and study how we can utilize it.

So with that PR, am I correct that we should implement the caching in ManifestFiles.java instead of creating a new CachingFileIO.java as this PR did?

rizaon avatar Jul 07 '22 05:07 rizaon

@rizaon because we want to isolate the caching to metadata files only, I think MetadataFiles.java would be a good place to start. We might want some utilities so if we find we want to cache FileIO elsewhere, we can reuse the same approach, but this seems like a full FileIO implementation isn't necessary (and introduces other problems).

danielcweeks avatar Jul 07 '22 15:07 danielcweeks

@danielcweeks 4d4c4fa implement the manifest file cache in ManifestFiles.java. Since ManifestFiles is stateless and we might deal with multiple FileIO, the cache is implemented as Cache<FileIO, ContentCache> (nested cache). It expire a FileIO's ContentCache after 1 hour of inactivity or dropCache(FileIO) is called.

rizaon avatar Jul 13 '22 18:07 rizaon

337bfe8 is a rebase + spotlessApply of 4d4c4fa

rizaon avatar Aug 01 '22 18:08 rizaon

Hello @danielcweeks, can I get a new feedback from my latest rebase please?

rizaon avatar Aug 16 '22 14:08 rizaon

Core test failed at HadoopFileIOTest > testHadoopFileIOKryoSerialization(). Should I do rebase to pick up the fix from #5437 ?

rizaon avatar Sep 06 '22 04:09 rizaon

bb737ab is a rebase + squash of all review so far.

15d67b3 is a fix for kyro serialization failure in testHadoopFileIOKryoSerialization().

rizaon avatar Sep 06 '22 15:09 rizaon

Hey @rizaon I think this is getting real close now (thanks for sticking with us on this). Now that we've gotten this far, I see there's a little more we can do to consolidate the logic in ContentCache. For example, if you pull the property configuration into ContentCache constructor, we can remove a lot from ManifestFiles. For example:

 public ContentCache(FileIO fileio) {
    this.fileio = fileio;

    long expireAfterAccessMs = PropertyUtil.propertyAsLong(
        fileio.properties(),
        CatalogProperties.IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS,
        CatalogProperties.IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT);

    long maxTotalBytes = PropertyUtil.propertyAsLong(
        fileio.properties(),
        CatalogProperties.IO_MANIFEST_CACHE_MAX_TOTAL_BYTES,
        CatalogProperties.IO_MANIFEST_CACHE_MAX_TOTAL_BYTES_DEFAULT);

    long maxContentLength = PropertyUtil.propertyAsLong(
        fileio.properties(),
        CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH,
        CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH_DEFAULT);

    ValidationException.check(expireAfterAccessMs >= 0, "expireAfterAccessMs is less than 0");
    ValidationException.check(maxTotalBytes > 0, "maxTotalBytes is equal or less than 0");
    ValidationException.check(maxContentLength > 0, "maxContentLength is equal or less than 0");
    this.expireAfterAccessMs = expireAfterAccessMs;
    this.maxTotalBytes = maxTotalBytes;
    this.maxContentLength = maxContentLength;

You can also pull CONTENT_CACHES, contentCache(FileIO io), and dropCache(FileIO fileIO) into ContentCache as well.

I played around with moving more of the logic into ContentCache and was able to move everything (minus the newInputFile) logic into ContentCache, which I feel is a cleaner separation of concerns.

danielcweeks avatar Sep 23 '22 17:09 danielcweeks

Hi @danielcweeks that looks like a nice cleanup. But I have 2 concerns:

  1. By pushing the config verification down to ContentCache constructor, that will make ContentCache less generic and more specific towards manifest caching use. Is that OK?
  2. If we keep fileio as a class member of ContentCache, that becomes a strong reference to fileio. Thus, the weakKeys() in CONTENT_CACHES is not useful anymore because there is always a strong reference to that FileIO key living in ContentCache object (the value)?

rizaon avatar Sep 23 '22 17:09 rizaon

Hi @danielcweeks that looks like a nice cleanup. But I have 2 concerns:

  1. By pushing the config verification down to ContentCache constructor, that will make ContentCache less generic and more specific towards manifest caching use. Is that OK?
  2. If we keep fileio as a class member of ContentCache, that becomes a strong reference to fileio. Thus, the weakKeys() in CONTENT_CACHES is not useful anymore because there is always a strong reference to that FileIO key living in ContentCache object (the value)?

Good points. Let me take one more pass (I was just hoping to consolidate more of the logic in ContentCache, but you're probably right that it makes more sense to keep the manifest settings with ManifestFiles).

danielcweeks avatar Sep 23 '22 17:09 danielcweeks

In the last github check runs, there was a checkstye issue and exception handling when given FileIO does not implement properties() method.

I have rebased this PR and add ba7329d to fix this issue.

rizaon avatar Sep 24 '22 00:09 rizaon

@rizaon Thanks for your contribution here and keeping this updated! I'm excited to see how this works out.

danielcweeks avatar Sep 26 '22 16:09 danielcweeks