delta
delta copied to clipboard
[Feature Request] More performant transaction log parsing for Azure
Feature request
Overview
There is room for performance improvement when parsing delta transaction logs to get the latest state of a delta table stored on Azure blob storage (ADLS gen 2 with the hierarchical namespace).
Motivation
For tables with relatively fast transactions open source delta is quite slow at parsing the transaction log of delta tables. For example we have a table where just parsing the transaction log to get the latest state takes 2 minutes compared to seconds on Databricks.
Further details
I have done some experimentation building my own versions of hadoop-azure
and delta-storage
and I was able to demonstrate a 10X performance improvement in my simple test (10 seconds instead of 2 minutes).
I did this just by creating an Azure specific implementation of AzureLogStore.listFrom
and plumbing it through to this function. This is a lot faster because it can take into account the ordering and the provided startFile
to list only the small number (~10) files we need in order. The generic HadoopFileSystemLogStore.listFrom
needs to list the entire transaction log directory then does the filtering and sorting in memory.
The main difficulty I see with this change is that it might need an upstream change to hadoop-azure
so that a version of listStatus
that accepts the all important startFrom
argument is exposed by AzureBlobFileSystem
. I opened an issue there too.
I also did a bit of testing when requesting a specific delta version and I was quite confused by the listing operations it was trying to do. I'm pretty sure this could be improved but for now its probably beyond my scala/java skill level to actually implement.
Willingness to contribute
The Delta Lake Community encourages new feature contributions. Would you or another member of your organization be willing to contribute an implementation of this feature?
- [ ] Yes. I can contribute this feature independently.
- [x] Yes. I would be willing to contribute this feature with guidance from the Delta Lake community. (This comes with the caveat that I've never written any scala or java code before now)
- [ ] No. I cannot contribute this feature at this time.
Hi @Tom-Newton - this looks like an awesome feature to add to Delta.
The main difficulty I see with this change is that it might need an upstream change to hadoop-azure so that a version of listStatus that accepts the all important startFrom argument is exposed by AzureBlobFileSystem. I opened an issue there too.
Could you elaborate further? Would be great to see a short design doc that details your proposed solution/prototype, what API is missing from hadoop-azure
, and what you would plan to change in hadoop-azure
.
@scottsand-db probably there is not much to write a design doc about but I can explain a bit more.
The performance improvement comes from listing only the last few files of the transaction log using native azure APIs instead of listing the whole transaction log then filtering and sorting in memory. hadoop-azure
does have a method that does exactly this. However it is not exposed in the public FileSystem
API. I think we would need an upstream change which adds this to the FileSystem
API. I asked about this and and it sounds like that will be a challenge.
I'm attaching the 2 patch files that I am currently using. Hopefully they make things a bit clearer. delta_storage.patch hadoop_azure.patch
what API is missing from hadoop-azure, and what you would plan to change in hadoop-azure.
problem is that @Tom-Newton wants to be able to do a list from a specific starting file. Which isn't a bad idea, except the hadoop FS APIs would need to add a new list() variant, which means designing one for cross-fs correctness, long term support, validation tests etc. Doing stuff like is hard because of the work needed to produce something considered broadly usable and implementable, (and for cloud infra: fast).
Hi @Tom-Newton - seems like there are some issues that need to be explored and addressed from the hadoop-azure side.
We can leave this issue open and feel free to re-comment if there is ever any progress made.
from the hadoop-azure side. more "from the entire hadoop list api".
Which doesn't mean its intractable, just makes it harder. FWIW, I do think the idea "set a starting point" is good. It is just the baseline impl would have to scan and discard or ideally in hdfs, know when to start returning pages of data -as this would reduce list marshalling overhead/time of locks on namenode structures
@steveloughran should we create a ticket for the new Hadoop FileSystem list API? Setting a starting point are supported by most of cloud storages (S3, Azure, GCP) in their own APIs, which looks a great improvement to Hadoop FileSystem.
yeah, you could; look at HDFS-13616 as the last time anyone went near listing and HADOOP-16898 as my attempt to keep the HDFS-first features under control. the newer apis (openFile(), multipart upload for example) take a builder api and return a Future<>; for listings we want RemoteIterator returned to allow client code to process pages of listings as they arrive
Hi @Tom-Newton @steveloughran do you have any update on this?
AzureBlobFileSystemStore already exposes public FileStatus[] listStatus(final Path path, final String startFrom)
Can't we use it instead of AzureBlobFileSystem?
I'm just using custom built jars for Hadoop Azure and delta.
AzureBlobFileSystemStore already exposes
public FileStatus[] listStatus(final Path path, final String startFrom)
This is a possibility but apparently AzureBlobFileSystemStore
should really be private. https://issues.apache.org/jira/browse/HADOOP-18599?focusedCommentId=17678873&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17678873
For S3 we use S3ListRequest which is also not public:
I think it is fair to use AzureBlobFileSystemStore. The perf diff is huge when you have 100K or million files.
For S3 we use S3ListRequest which is also not public
going to break your code with a move to 3.4 and the v2 AWS SDK I'm afraid.
how about you actually design an improvement for a listing API in hadoop-common and we can implement in the stores (s3a, azure) which can do this.
look at https://github.com/apache/hadoop/pull/6726 to see an example of this
- new interface which filesystems may implement
- implementation for cloud storage
- hasPathCapability() probe for the feature
- Also to assist libraries which need to compile against older versions, a "reflection friendly" way to invoke.
RemoteIterator <FileStatus> listFilesFromPath(path parent, path start)
plus in the WrappedOperations class
RemoteIterator <FileStatus> listFilesFromPath(Filesystem, path parent, path start)
that will do the checks for availability, interface casting etc, so libraries just need to load one static method with 3 params and get a return type they're used to