delta
delta copied to clipboard
[1191] Use startAfter in S3SingleDriverLogStore.listFrom
Description
The current implementation of S3SingleDriverLogStore.listFrom
lists the entire content of the parent directory and filters the result. This can take a long time if the parent directory contains a lot of files. In practice, this can happen for _delta_log folders with a lot of commits.
We change the implementation to use the startAfter parameter such that we only get keys lexicographically greater or equal than the resolved path in the S3 list response. This will usually reduce the number of S3 list requests from size of _delta_log / 1000
to 1.
This resolves #1191.
How was this patch tested?
I've tested the patch briefly with the sample test described in #1191. The previous iteration of this patch has been tested a bit more. Correctness has not been tested thoroughly.
Does this PR introduce any user-facing changes?
No
I've changed the code to fall back to the old implementation in tests. We do similar tricks in the GCS and HDFS implementations. By moving the S3 specific code to a static method of a separate class we also avoid the hadoop-aws
dependency in core.
Still there is no test coverage for the new code. There is an issue to add integration tests for the log stores which could resolve this, see https://github.com/delta-io/delta/issues/1124.
It would be useful to make the same optimizations to the other log stores too. Google Cloud Storage supports the same feature, the same does HDFS, while Azure Blob Storage does not. I don't know if any local file systems support a similar API. The best would be if listFrom
was part of the Hadoop Filesystem interface.
Hi @jkylling - awesome, thanks for that update. We will take a look after the next delta lake release, which should be within 2 weeks. Cheers.
Sorry for the delay. This is a great improvement. Do you have any idea which Hadoop version is the minimum required version? Ideally we would need to fallback to the original implementation if we cannot call these new s3a APIs in some old hadoop versions.
Approximately how old Hadoop versions must be supported? The S3 V2 API has been part of Hadoop since version 3.0.0-beta1 from October 2017, see https://issues.apache.org/jira/browse/HADOOP-13421?attachmentOrder=asc However, it looks like the Listing.createFileStatusListingIterator
we use has only been public since version 3.3.0, see https://issues.apache.org/jira/browse/HADOOP-16384 (this is part of the risk of digging so deep in the Hadoop API).
Sorry for the delay. This is a great improvement. Do you have any idea which Hadoop version is the minimum required version? Ideally we would need to fallback to the original implementation if we cannot call these new s3a APIs in some old hadoop versions.
Approximately how old Hadoop versions must be supported? The S3 V2 API has been part of Hadoop since version 3.0.0-beta1 from October 2017, see https://issues.apache.org/jira/browse/HADOOP-13421?attachmentOrder=asc However, it looks like the
Listing.createFileStatusListingIterator
we use has only been public since version 3.3.0, see https://issues.apache.org/jira/browse/HADOOP-16384 (this is part of the risk of digging so deep in the Hadoop API).
I briefly tested this PR in delta-standalone and it looks like in fact Hadoop 3.3.1 is required. I've added a check for that in https://github.com/delta-io/delta/pull/1210/commits/c6202343c1b3cbff8fe3bdcfad489f9fe0020dd3
Hi @jkylling - what environment are you running this integration test on?
I've found a strange shading error on EMR 6.6, for example, and I'm not sure if this new feature will work. TLDR; hadoop-client shades guava, but hadoop-aws doesn't, so I am getting a ClassNotFound exception.
Considering that, can you please put this new feature behind a feature flag? That way users have to explicitly enable it.
Hi @jkylling - what environment are you running this integration test on?
I've found a strange shading error on EMR 6.6, for example, and I'm not sure if this new feature will work. TLDR; hadoop-client shades guava, but hadoop-aws doesn't, so I am getting a ClassNotFound exception.
Considering that, can you please put this new feature behind a feature flag? That way users have to explicitly enable it.
What was the exact setup you were using? Could you try using the S3AFileSystem without this patch and see if it still fails? I've just been running the integration tests locally from my machine, but there should not be anything special about the setup. In the getting started section in the documentation for hadoop-aws it is stated that hadoop-client or hadoop-common are required dependencies. So we should be able to depend on the shaded guava library in hadoop-client.
@jkylling the issue we found is EMR 6.6 shipped a bad hadoop-aws jar. We hit the following issue when using EMR 6.6:
java.lang.NoSuchMethodError: org.apache.hadoop.util.SemaphoredDelegatingExecutor.<init>(Lcom/google/common/util/concurrent/ListeningExecutorService;IZ)V
at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:813)
The constructor SemaphoredDelegatingExecutor
on EMR 6.6 is below:
scala> classOf[org.apache.hadoop.util.SemaphoredDelegatingExecutor].getConstructors.toList
res0: List[java.lang.reflect.Constructor[_]] = List(public org.apache.hadoop.util.SemaphoredDelegatingExecutor(org.apache.hadoop.shaded.com.google.common.util.concurrent.ListeningExecutorService,int,boolean))
Note: the difference com/google/common/util/concurrent/ListeningExecutorService
and org.apache.hadoop.shaded.com.google.common.util.concurrent.ListeningExecutorService
@zsxwing @scottsand-db Thank you for the thorough reviews. I'm on vacation for some weeks now and will be a bit slower to address this, but I'll get to it eventually.
How should a feature flag be added? The easiest way to pass configuration to the LogStores is through the Hadoop configuration. Would it be okay to read a delta.enableFastS3ListFrom
property from the Hadoop configuration?
@jkylling
How should a feature flag be added? The easiest way to pass configuration to the LogStores is through the Hadoop configuration. Would it be okay to read a
delta.enableFastS3ListFrom
property from the Hadoop configuration?
Yup. Delta Lake will pass all properties from the Spark SQL Conf to the Hadoop Conf here.
So delta.enableFastS3aListFrom
SGTM. Nit: S3a
instead of S3
since this is S3a specific.
Hi @jkylling - any update? Need any help?
He's on vacation, see his earlier message: https://github.com/delta-io/delta/pull/1210#issuecomment-1199138675
On Fri, 5 Aug 2022 at 20:43, Scott Sandre @.***> wrote:
Hi @jkylling https://github.com/jkylling - any update? Need any help?
— Reply to this email directly, view it on GitHub https://github.com/delta-io/delta/pull/1210#issuecomment-1206750558, or unsubscribe https://github.com/notifications/unsubscribe-auth/ABLPSNO344V22HY3UIQG44TVXVOGTANCNFSM5Y5CS65Q . You are receiving this because you are subscribed to this thread.Message ID: @.***>
@zsxwing @scottsand-db Thank you for your patience! The PR has been updated to address most of the open review comments, and the feature is now behind the delta.enableFastS3AListFrom
feature flag. The most notable outstanding review comment is that we still convert the iterator to an array, and then back to an iterator, since a bit more refactoring would be needed to use iterators everywhere.
Looks great!
Sorry for the delayed review. This is a really exciting feature and we would LOVE to get it for Delta Lake 2.2.
Awesome! It would be great to get this into Delta Lake 2.2.
Main change I ask is to throw a better exception when a) flag is enabled but b) file system is not S3a.
Done in https://github.com/delta-io/delta/pull/1210/commits/fc0caeadd4a463a3c0e6fe68ae3042ee7a47bc8d, see also replies to comments.
I also wonder if you could write a py/spark integration test? (no worries if not - we can help out with this too). Would be great to see you set the necessary confs in a spark session, enable this flag, and write using this new improved method using spark apis.
we have plenty of examples in the example folder that show this
If you could help out with the py/spark integration tests I would be very grateful!
@jkylling can you please give me your public github email? I need it for the merge to give you authorship credits, as required by the linux foundation guidelines.
🎉