delta icon indicating copy to clipboard operation
delta copied to clipboard

[1191] Use startAfter in S3SingleDriverLogStore.listFrom

Open jkylling opened this issue 2 years ago • 12 comments

Description

The current implementation of S3SingleDriverLogStore.listFrom lists the entire content of the parent directory and filters the result. This can take a long time if the parent directory contains a lot of files. In practice, this can happen for _delta_log folders with a lot of commits.

We change the implementation to use the startAfter parameter such that we only get keys lexicographically greater or equal than the resolved path in the S3 list response. This will usually reduce the number of S3 list requests from size of _delta_log / 1000 to 1.

This resolves #1191.

How was this patch tested?

I've tested the patch briefly with the sample test described in #1191. The previous iteration of this patch has been tested a bit more. Correctness has not been tested thoroughly.

Does this PR introduce any user-facing changes?

No

jkylling avatar Jun 16 '22 00:06 jkylling

I've changed the code to fall back to the old implementation in tests. We do similar tricks in the GCS and HDFS implementations. By moving the S3 specific code to a static method of a separate class we also avoid the hadoop-aws dependency in core.

Still there is no test coverage for the new code. There is an issue to add integration tests for the log stores which could resolve this, see https://github.com/delta-io/delta/issues/1124.

It would be useful to make the same optimizations to the other log stores too. Google Cloud Storage supports the same feature, the same does HDFS, while Azure Blob Storage does not. I don't know if any local file systems support a similar API. The best would be if listFrom was part of the Hadoop Filesystem interface.

jkylling avatar Jun 19 '22 11:06 jkylling

Hi @jkylling - awesome, thanks for that update. We will take a look after the next delta lake release, which should be within 2 weeks. Cheers.

scottsand-db avatar Jun 21 '22 23:06 scottsand-db

Sorry for the delay. This is a great improvement. Do you have any idea which Hadoop version is the minimum required version? Ideally we would need to fallback to the original implementation if we cannot call these new s3a APIs in some old hadoop versions.

Approximately how old Hadoop versions must be supported? The S3 V2 API has been part of Hadoop since version 3.0.0-beta1 from October 2017, see https://issues.apache.org/jira/browse/HADOOP-13421?attachmentOrder=asc However, it looks like the Listing.createFileStatusListingIterator we use has only been public since version 3.3.0, see https://issues.apache.org/jira/browse/HADOOP-16384 (this is part of the risk of digging so deep in the Hadoop API).

jkylling avatar Jul 13 '22 10:07 jkylling

Sorry for the delay. This is a great improvement. Do you have any idea which Hadoop version is the minimum required version? Ideally we would need to fallback to the original implementation if we cannot call these new s3a APIs in some old hadoop versions.

Approximately how old Hadoop versions must be supported? The S3 V2 API has been part of Hadoop since version 3.0.0-beta1 from October 2017, see https://issues.apache.org/jira/browse/HADOOP-13421?attachmentOrder=asc However, it looks like the Listing.createFileStatusListingIterator we use has only been public since version 3.3.0, see https://issues.apache.org/jira/browse/HADOOP-16384 (this is part of the risk of digging so deep in the Hadoop API).

I briefly tested this PR in delta-standalone and it looks like in fact Hadoop 3.3.1 is required. I've added a check for that in https://github.com/delta-io/delta/pull/1210/commits/c6202343c1b3cbff8fe3bdcfad489f9fe0020dd3

jkylling avatar Jul 14 '22 18:07 jkylling

Hi @jkylling - what environment are you running this integration test on?

I've found a strange shading error on EMR 6.6, for example, and I'm not sure if this new feature will work. TLDR; hadoop-client shades guava, but hadoop-aws doesn't, so I am getting a ClassNotFound exception.

Considering that, can you please put this new feature behind a feature flag? That way users have to explicitly enable it.

scottsand-db avatar Jul 27 '22 18:07 scottsand-db

Hi @jkylling - what environment are you running this integration test on?

I've found a strange shading error on EMR 6.6, for example, and I'm not sure if this new feature will work. TLDR; hadoop-client shades guava, but hadoop-aws doesn't, so I am getting a ClassNotFound exception.

Considering that, can you please put this new feature behind a feature flag? That way users have to explicitly enable it.

What was the exact setup you were using? Could you try using the S3AFileSystem without this patch and see if it still fails? I've just been running the integration tests locally from my machine, but there should not be anything special about the setup. In the getting started section in the documentation for hadoop-aws it is stated that hadoop-client or hadoop-common are required dependencies. So we should be able to depend on the shaded guava library in hadoop-client.

jkylling avatar Jul 27 '22 20:07 jkylling

@jkylling the issue we found is EMR 6.6 shipped a bad hadoop-aws jar. We hit the following issue when using EMR 6.6:

java.lang.NoSuchMethodError: org.apache.hadoop.util.SemaphoredDelegatingExecutor.<init>(Lcom/google/common/util/concurrent/ListeningExecutorService;IZ)V
  at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:813)

The constructor SemaphoredDelegatingExecutor on EMR 6.6 is below:

scala> classOf[org.apache.hadoop.util.SemaphoredDelegatingExecutor].getConstructors.toList
res0: List[java.lang.reflect.Constructor[_]] = List(public org.apache.hadoop.util.SemaphoredDelegatingExecutor(org.apache.hadoop.shaded.com.google.common.util.concurrent.ListeningExecutorService,int,boolean))

Note: the difference com/google/common/util/concurrent/ListeningExecutorService and org.apache.hadoop.shaded.com.google.common.util.concurrent.ListeningExecutorService

zsxwing avatar Jul 27 '22 20:07 zsxwing

@zsxwing @scottsand-db Thank you for the thorough reviews. I'm on vacation for some weeks now and will be a bit slower to address this, but I'll get to it eventually.

How should a feature flag be added? The easiest way to pass configuration to the LogStores is through the Hadoop configuration. Would it be okay to read a delta.enableFastS3ListFrom property from the Hadoop configuration?

jkylling avatar Jul 29 '22 10:07 jkylling

@jkylling

How should a feature flag be added? The easiest way to pass configuration to the LogStores is through the Hadoop configuration. Would it be okay to read a delta.enableFastS3ListFrom property from the Hadoop configuration?

Yup. Delta Lake will pass all properties from the Spark SQL Conf to the Hadoop Conf here.

So delta.enableFastS3aListFrom SGTM. Nit: S3a instead of S3 since this is S3a specific.

scottsand-db avatar Jul 29 '22 16:07 scottsand-db

Hi @jkylling - any update? Need any help?

scottsand-db avatar Aug 05 '22 18:08 scottsand-db

He's on vacation, see his earlier message: https://github.com/delta-io/delta/pull/1210#issuecomment-1199138675

On Fri, 5 Aug 2022 at 20:43, Scott Sandre @.***> wrote:

Hi @jkylling https://github.com/jkylling - any update? Need any help?

— Reply to this email directly, view it on GitHub https://github.com/delta-io/delta/pull/1210#issuecomment-1206750558, or unsubscribe https://github.com/notifications/unsubscribe-auth/ABLPSNO344V22HY3UIQG44TVXVOGTANCNFSM5Y5CS65Q . You are receiving this because you are subscribed to this thread.Message ID: @.***>

vegarsti avatar Aug 05 '22 21:08 vegarsti

@zsxwing @scottsand-db Thank you for your patience! The PR has been updated to address most of the open review comments, and the feature is now behind the delta.enableFastS3AListFrom feature flag. The most notable outstanding review comment is that we still convert the iterator to an array, and then back to an iterator, since a bit more refactoring would be needed to use iterators everywhere.

jkylling avatar Aug 16 '22 19:08 jkylling

Looks great!

Sorry for the delayed review. This is a really exciting feature and we would LOVE to get it for Delta Lake 2.2.

Awesome! It would be great to get this into Delta Lake 2.2.

Main change I ask is to throw a better exception when a) flag is enabled but b) file system is not S3a.

Done in https://github.com/delta-io/delta/pull/1210/commits/fc0caeadd4a463a3c0e6fe68ae3042ee7a47bc8d, see also replies to comments.

I also wonder if you could write a py/spark integration test? (no worries if not - we can help out with this too). Would be great to see you set the necessary confs in a spark session, enable this flag, and write using this new improved method using spark apis.

we have plenty of examples in the example folder that show this

If you could help out with the py/spark integration tests I would be very grateful!

jkylling avatar Oct 05 '22 14:10 jkylling

@jkylling can you please give me your public github email? I need it for the merge to give you authorship credits, as required by the linux foundation guidelines.

scottsand-db avatar Oct 25 '22 18:10 scottsand-db

🎉

vegarsti avatar Oct 26 '22 07:10 vegarsti