Improve performance of `datafusion-cli` when reading from remote storage
Is your feature request related to a problem or challenge?
- Part of https://github.com/apache/datafusion/pull/16300/files
While testing https://github.com/apache/datafusion/pull/16300, I (re-noticed) that datafusion-cli is really slow when reading remote files
For example,
The initial table creation takes 7.5 seconds on my pretty crappy connection:
DataFusion CLI v48.0.0
> CREATE EXTERNAL TABLE nyc_taxi_rides
STORED AS PARQUET LOCATION 's3://altinity-clickhouse-data/nyc_taxi_rides/data/tripdata_parquet/';
0 row(s) fetched.
Elapsed 7.492 seconds.
However then simple queries just to get the count take 8-10 seconds š±
> select count(*) from nyc_taxi_rides;
+------------+
| count(*) |
+------------+
| 1310903963 |
+------------+
1 row(s) fetched.
Elapsed 8.945 seconds.
> select count(*) from nyc_taxi_rides;
+------------+
| count(*) |
+------------+
| 1310903963 |
+------------+
1 row(s) fetched.
Elapsed 10.456 seconds.
I am almost certain this delay is due to having to read the footers of the parquet files for each query.
Note the speed is much faster when collect_statistics is on
> set datafusion.execution.collect_statistics = true;
0 row(s) fetched.
Elapsed 0.012 seconds.
> CREATE EXTERNAL TABLE nyc_taxi_rides
STORED AS PARQUET LOCATION 's3://altinity-clickhouse-data/nyc_taxi_rides/data/tripdata_parquet/';
0 row(s) fetched.
Elapsed 7.770 seconds.
The first query is still slow for some reason: (5 seconds)
> select count(*) from nyc_taxi_rides;
+------------+
| count(*) |
+------------+
| 1310903963 |
+------------+
1 row(s) fetched.
Elapsed 5.114 seconds.
But subsequent queries are quite fast:
> select count(*) from nyc_taxi_rides;
+------------+
| count(*) |
+------------+
| 1310903963 |
+------------+
1 row(s) fetched.
Elapsed 0.297 seconds.
Describe the solution you'd like
I would like:
datafusion-clito be faster for such queriesdatafusion-clito be an easy to follow model for how to cache metadata when working with ListingTabe that others who build with DataFusion could follow
Describe alternatives you've considered
I think the obvious thing that is needed is a cache for the ParquetMetadata
I think the actual cache should be in datafusion-cli but NOT in the datafusion core crate as I think what and how to cache will be different across systems.
What I envision is:
- APIs in the ListingTable / RuntimeEnv / etc for adding caching of ParquetMetadata
- An implementation of those APIs in
datafusion-cli - Bonus Points: documentation / examples that show how to use those APIs in other system
There is some vestigal code in the cache_manager crate that I think could provide a home for such caching APIs: * https://docs.rs/datafusion/latest/datafusion/execution/cache/cache_manager/index.html
Additional context
Related issues
- https://github.com/apache/datafusion/issues/15585
- https://github.com/apache/datafusion/issues/15582
Hi @alamb, I would like to take up on this. I am new to the project, but this seems like a good way to start.
Hi @alamb, I would like to take up on this. I am new to the project, but this seems like a good way to start.
It would certainly be nice, though it might be tricky for someone without experience with query processing from remote object storage.
I will try to study and dig more on this. It would be great if you can give starting pointers when i am stuck :)
Thanks @swaingotnochill I would probably start with profiling. The profiling techniques can be found https://github.com/apache/datafusion/blob/main/docs/source/library-user-guide/profiling.md#profiling-using-samply-cross-platform-profiler
I started profiling and I can definitely see reading of parquet footers and metadata taking a chunk of time initially. Thanks @comphead
Hi @swaingotnochill Iām curious about how we can isolate the footer parsing time from the end-to-end Parquet reading time. Could you explain the best approach to measure this for a runtime-breakdown?
Hi @swaingotnochill Iām curious about how we can isolate the footer parsing time from the end-to-end Parquet reading time. Could you explain the best approach to measure this for a runtime-breakdown?
I personally recommend looking at using samply to profile the run: https://datafusion.apache.org/library-user-guide/profiling.html#profile-the-benchmark
You could potentially use some of the example datasets in https://datafusion.apache.org/user-guide/cli/datasources.html#remote-files-directories
Something like samply record datafusion-cli -c "select count(*) from 'https://datasets.clickhouse.com/hits_compatible/athena_partitioned/hits_1.parquet'"
Note you'll have to build datafusion-cli with --profile profiling
You should see something like this
Hi @swaingotnochill Iām curious about how we can isolate the footer parsing time from the end-to-end Parquet reading time. Could you explain the best approach to measure this for a runtime-breakdown?
let metadata = Arc::new(
ParquetMetaDataReader::new()
.with_prefetch_hint(size_hint)
.load_and_finish(fetch, file_size)
.await?,
);
This seems to be the lowest level that is doing both the I/O bound(fetching footer data) and CPU bound(parsing data). However, the fetches are asynchronous. I would likely start with this here and add metrics to measure this exactly in runtime. Other is the profiling which andrew suggested which seems simpler once we setup.
@XiangpengHao perhaps has implemented code that could be useful here: https://github.com/XiangpengHao/liquid-cache/issues/227#issuecomment-3019766010
Basically it shows how to implement paruqet metadata caching with just the external APIs
It would be great to make this easier
@JigaoLuo has a great write up of why we need to cache metadata: #227 (comment)
Metadata cache update
I have a metadata cache implementation a while ago, but have procrastinated forever to actually polish it.
Maybe perfect is the enemy of good, I'll just show whatever I already have: https://github.com/XiangpengHao/parquet-study. Copy some of the readme here:
So you want to cache Parquet metadata in DataFusion? (so that one Parquet metadata is read/decoded once and only once).
It's not easy (a blog post is coming soon), but not impossible.
Usage
Copy the
src/metedata_cache.rsto your project, and use it like below.Option 1
use datafusion::prelude::*; use crate::metadata_cache::RegisterParquetWithMetaCache;
let ctx = SessionContext::new();
// Instead of: // ctx.register_parquet("table", "file.parquet", ParquetReadOptions::default()).await?; ctx.register_parquet_with_meta_cache( "table", "path/to/file.parquet", ParquetReadOptions::default() ).await?;
Option 2
If you're low-level listing table users:
use crate::metadata_cache::{ParquetFormatMetadataCacheFactory, ToListingOptionsWithMetaCache};
let parquet_options = ParquetReadOptions::default(); let listing_options = parquet_options.to_listing_options_with_meta_cache(&ctx.copied_config(), ctx.copied_table_options());
ctx.register_listing_table( "table", "path/to/file.parquet", listing_options, parquet_options.schema.map(|s| Arc::new(s.to_owned())), None, ).await?;
More writings
Basically there are three places we read Parquet metadata:
- Infer schema
- Infer stats
- Open Parquet files
Reading metadata has two costs:
- IO cost to read the data. Each of the metadata read can cost up to 2 network request: first to load the Parquet footer, which decides the metadata size; second to load the actual metadata. But most of the case we only need 1, e.g., we optimistically fetch the last 4MB of the parquet file, which likely already covers the all metadata.
- CPU cost to decode metadata. Metadata is not the fastest thing to decode, it's usually not a big problem, but some people complain about it.
(Since I have written so many things here, maybe just to finish that blog post?)
This seems nice. I am already working on an implementation, I can maybe take some inspiration from this. Let me have a basic POC working. I will try to create a draft PR by the end of this week.
I observed a significant performance improvement just testing on the master branch without any metadata caching as compared to before. I don't know what change caused this? Any idea.
ā datafusion git:(main) ā ./target/debug/datafusion-cli
DataFusion CLI v48.0.0
> CREATE EXTERNAL TABLE nyc_taxi_rides
STORED AS PARQUET LOCATION 's3://altinity-clickhouse-data/nyc_taxi_rides/data/tripdata_parquet/';
0 row(s) fetched.
Elapsed 4.889 seconds.
> select count(*) from nyc_taxi_rides;
+------------+
| count(*) |
+------------+
| 1310903963 |
+------------+
1 row(s) fetched.
Elapsed 3.508 seconds.
> select count(*) from nyc_taxi_rides;
+------------+
| count(*) |
+------------+
| 1310903963 |
+------------+
1 row(s) fetched.
Elapsed 0.252 seconds.
I observed a significant performance improvement just testing on the master branch without any metadata caching as compared to before. I don't know what change caused this? Any idea.
It is almost certainly due to https://datafusion.apache.org/library-user-guide/upgrading.html#datafusion-execution-collect-statistics-now-defaults-to-true
@alamb You are right, the improvement is due to collect_statistics.
POC is able to reduce the original time, however it still doesn't seem to work very well with collect_statistics which is I am still working on.
This has most of the code from @XiangpengHao and some changes to incorporate it, but not very promising when compared to collect_statistics=true.
Here are the results:
ā datafusion git:(rswain/parquet_metadata_caching) ā ./target/debug/datafusion-cli
DataFusion CLI v48.0.0
> set datafusion.execution.collect_statistics = false;
0 row(s) fetched.
Elapsed 0.008 seconds.
> CREATE EXTERNAL TABLE nyc_taxi_rides
STORED AS PARQUET LOCATION 's3://altinity-clickhouse-data/nyc_taxi_rides/data/tripdata_parquet/';
0 row(s) fetched.
Elapsed 8.987 seconds.
> select count(*) from nyc_taxi_rides;
+------------+
| count(*) |
+------------+
| 1310903963 |
+------------+
1 row(s) fetched.
Elapsed 1.422 seconds.
> select count(*) from nyc_taxi_rides;
+------------+
| count(*) |
+------------+
| 1310903963 |
+------------+
1 row(s) fetched.
Elapsed 1.343 seconds.
> set datafusion.execution.collect_statistics = true;
0 row(s) fetched.
Elapsed 0.002 seconds.
> select count(*) from nyc_taxi_rides;
+------------+
| count(*) |
+------------+
| 1310903963 |
+------------+
1 row(s) fetched.
Elapsed 1.818 seconds.
> select count(*) from nyc_taxi_rides;
+------------+
| count(*) |
+------------+
| 1310903963 |
+------------+
1 row(s) fetched.
Elapsed 1.157 seconds.
I've been investigating performance when using the ListingTable with remote storage, and since datafusion-cli ultimately uses the ListingTable I'm curious if my findings might be interesting and related to this issue. Most of my focus has been on using the ListingTable with hive partitioned data. I observed there was a very large discrepancy in the performance between hive partitioned datasets and flat partitioned datasets, even when the number of underlying objects was similar. This was actually noted in an unresolved bug from about a year ago #9654 .
I put a few simple targeted timing debug prints in the code and learned that in many cases most of the time spent for remote queries is dominated by listing the files in the backing object store. Due to the current implementation around listing files for hive partitioned data this can often be doubly true for such datasets (the depth of the partitioning structure impacts the number of round-trip latency costs that are paid every query). Here are some examples:
DataFusion CLI v49.0.0
> CREATE EXTERNAL TABLE athena_partitioned
STORED AS PARQUET LOCATION 's3://clickhouse-public-datasets/hits_compatible/athena_partitioned/';
0 row(s) fetched.
Elapsed 4.382 seconds.
> select count(*) from athena_partitioned;
file_list duration: 0.0110909995ms
full group files duration: 2028.1296ms
+----------+
| count(*) |
+----------+
| 99997497 |
+----------+
1 row(s) fetched.
Elapsed 2.031 seconds.
> select count(*) from athena_partitioned;
file_list duration: 0.0058609997ms
-- NOTE: this is likely substantially faster due to the recent changes for files stat caching
-- A similar pattern can be seen for most of the examples here
full group files duration: 156.28888ms
+----------+
| count(*) |
+----------+
| 99997497 |
+----------+
1 row(s) fetched.
Elapsed 0.159 seconds.
> select count(*) from 's3://clickhouse-public-datasets/hits_compatible/athena_partitioned/';
file_list duration: 0.004357ms
full group files duration: 1681.8896ms
+----------+
| count(*) |
+----------+
| 99997497 |
+----------+
1 row(s) fetched.
Elapsed 4.231 seconds.
> select count(*) from 's3://clickhouse-public-datasets/hits_compatible/athena_partitioned/';
file_list duration: 0.004645ms
full group files duration: 1461.2577ms
+----------+
| count(*) |
+----------+
| 99997497 |
+----------+
1 row(s) fetched.
Elapsed 3.538 seconds.
> CREATE EXTERNAL TABLE overture_maps
STORED AS PARQUET LOCATION 's3://overturemaps-us-west-2/release/2025-07-23.0/';
0 row(s) fetched.
Elapsed 10.764 seconds.
> select count(*) from overture_maps where type='address';
list_partitions_from_paths: listing from release/2025-07-23.0
list_partitions_from_paths: found 512 files
list_partitions_from_paths: built 22 partitions
Listed 22 partitions in 136.81548ms
Pruning yielded 1 partitions in 0.147322ms
file_list duration: 136.97224ms
full group files duration: 353.54166ms
+-----------+
| count(*) |
+-----------+
| 446544475 |
+-----------+
1 row(s) fetched.
Elapsed 0.360 seconds.
> select count(*) from overture_maps where type='address';
list_partitions_from_paths: listing from release/2025-07-23.0
list_partitions_from_paths: found 512 files
list_partitions_from_paths: built 22 partitions
Listed 22 partitions in 181.16426ms
Pruning yielded 1 partitions in 0.092711ms
file_list duration: 181.26404ms
full group files duration: 181.33081ms
+-----------+
| count(*) |
+-----------+
| 446544475 |
+-----------+
1 row(s) fetched.
Elapsed 0.186 seconds.
> select count(*) from overture_maps where type='address' and theme='addresses';
list_partitions_from_paths: listing from release/2025-07-23.0/theme=addresses/type=address
list_partitions_from_paths: found 18 files
list_partitions_from_paths: built 2 partitions
Listed 2 partitions in 126.23396ms
Pruning yielded 1 partitions in 0.084972ms
file_list duration: 126.32848ms
full group files duration: 164.76274ms
+-----------+
| count(*) |
+-----------+
| 446544475 |
+-----------+
1 row(s) fetched.
Elapsed 0.170 seconds.
I have a POC that, at least initially, appears to normalize the performance between hive partitioned datasets and flat partitioned datasets and would allow both partitioned and flat datasets a like to leverage the existing ListFilesCache for users that create a ListingTable manually (such as myself). Would there be interest in looking further at the POC or discussing additional strategies for normalizing and reducing the amount of time spent listing objects?
Would there be interest in looking further at the POC or discussing additional strategies for normalizing and reducing the amount of time spent listing objects?
I am interested for sure
Similarly to the hooks for Parquet Metadata which @nuno-faria is connecting up as part of this epic
- https://github.com/apache/datafusion/issues/17000
There is an old partial API for caching the results of listing here: https://docs.rs/datafusion/latest/datafusion/execution/cache/cache_manager/struct.CacheManager.html#method.get_list_files_cache
Perhaps we can follow the same approach as in #17000 and add a default cache for listing results (maybe even using the same cache limit).
The biggest challenge I think will be to clearly articulate when the cached list is expired (so for example, the table picks up changes to the underlying system)
Is this something you might be willing to help with @BlakeOrth ?
At the end of the day I'm going to be working on some way to get listing resulted cached, and I'd much rather make those changes here to contribute back to open source than keep it in our proprietary code. I'm happy to help out to move this forward wherever I can.
In my mind the work to normalize performance between flat and hive partitioned datasets is separate, but related, to any work that would actually cache the listing results from either of those workflows. Should discussions on approach happen here or in separate issue(s) more aligned with the work directly?
@alamb I have some freshly minted test results from using both my POC for normalizing the access patterns of hive partitioned datasets and flat datasets when using the ListingTable as well as enabling the existing list files cache by default, and enabling the new parquet metadata cache. Obviously getting a (very rough) concept of either of these workflows isn't the hard part, as you've already mentioned communicating to users the expected behavior of the default configuration is going to be the difficult thing. However, I think the performance here likely speaks for itself for simple queries:
Current performance (copied from above):
> CREATE EXTERNAL TABLE overture_maps
STORED AS PARQUET LOCATION 's3://overturemaps-us-west-2/release/2025-07-23.0/';
0 row(s) fetched.
Elapsed 10.764 seconds.
> select count(*) from overture_maps where type='address';
list_partitions_from_paths: listing from release/2025-07-23.0
list_partitions_from_paths: found 512 files
list_partitions_from_paths: built 22 partitions
Listed 22 partitions in 136.81548ms
Pruning yielded 1 partitions in 0.147322ms
file_list duration: 136.97224ms
full group files duration: 353.54166ms
+-----------+
| count(*) |
+-----------+
| 446544475 |
+-----------+
1 row(s) fetched.
Elapsed 0.360 seconds.
> select count(*) from overture_maps where type='address';
list_partitions_from_paths: listing from release/2025-07-23.0
list_partitions_from_paths: found 512 files
list_partitions_from_paths: built 22 partitions
Listed 22 partitions in 181.16426ms
Pruning yielded 1 partitions in 0.092711ms
file_list duration: 181.26404ms
full group files duration: 181.33081ms
+-----------+
| count(*) |
+-----------+
| 446544475 |
+-----------+
1 row(s) fetched.
Elapsed 0.186 seconds.
POC performance:
DataFusion CLI v49.0.0
> set datafusion.execution.parquet.cache_metadata = true;
0 row(s) fetched.
Elapsed 0.001 seconds.
> CREATE EXTERNAL TABLE overture_maps
STORED AS PARQUET LOCATION 's3://overturemaps-us-west-2/release/2025-07-23.0/';
0 row(s) fetched.
Elapsed 10.107 seconds.
> select count(*) from overture_maps where type='address';
OPTIMIZED: Starting file listing from: release/2025-07-23.0
OPTIMIZED: Listed all files in 0.053733002ms
file_list duration: 0.067717ms
full group files duration: 215.65643ms
+-----------+
| count(*) |
+-----------+
| 446544475 |
+-----------+
1 row(s) fetched.
Elapsed 0.223 seconds.
> select count(*) from overture_maps where type='address';
OPTIMIZED: Starting file listing from: release/2025-07-23.0
OPTIMIZED: Listed all files in 0.048692ms
file_list duration: 0.061409004ms
full group files duration: 4.356367ms
+-----------+
| count(*) |
+-----------+
| 446544475 |
+-----------+
1 row(s) fetched.
Elapsed 0.008 seconds.
Edit: oops, I realize I used a subset of the table for the two different cases in the original comment, I've edited them to be equivalent now. Not that it changed the outcome too much anyway for cached queries.
At the end of the day I'm going to be working on some way to get listing resulted cached, and I'd much rather make those changes here to contribute back to open source than keep it in our proprietary code. I'm happy to help out to move this forward wherever I can.
@BlakeOrth
I think we should make a new issue. I think we can take the same approach for listing results as we took for parquet metadata caching (basically follow the path that @nuno-faria blazed):
- https://github.com/apache/datafusion/issues/17000
Basically
- Provide a default implementation for the (already existing) ListFilesCache
- Implement some reasonable default value for refresh along with a config setting to change that default
- Implement some way to see the contents of the cache
If you are willing to potentially help with this work, I can spec it out in a ticket / epic.
In my mind the work to normalize performance between flat and hive partitioned datasets is separate, but related, to any work that would actually cache the listing results from either of those workflows. Should discussions on approach happen here or in separate issue(s) more aligned with the work directly?
Since they all use the ListingTable implementation I think the code will the same
@alamb
If you are willing to potentially help with this work, I can spec it out in a ticket / epic.
This sounds great. As you've likely noted I've already started getting familiar with the code and recent PRs around this area of the code. If you can tag me on the issues once you have a chance to write them up I'm happy to continue the discussion on design/implementation specifics.
I have been thinking about this @BlakeOrth -- and when I was trying to write up this plan, I realized it wasn't actually clear to me what was happening and thus I didn't know what to write up.
One thing I was thinking is that we could make it easier to see what datafusion-cli was doing so we could better diagnose why this command takes so long (and thus how we can make it better0
> CREATE EXTERNAL TABLE overture_maps
STORED AS PARQUET LOCATION 's3://overturemaps-us-west-2/release/2025-07-23.0/';
0 row(s) fetched.
Elapsed 10.107 seconds.
Perhaps we can take inspiration from @kosiew and https://github.com/apache/datafusion/pull/17021 and add some way to monitor what is happening I/O wise from datafusion-cli.
I will try and spec out a ticket for this
@alamb I think additional observability tooling is almost always a positive development. That being said, I have to be completely honest with you and note that I'm ultimately an API user, not a CLI user. I've been using a hacky-instrumented CLI here to help give a common tool and example(s) of potential improvements. The CLI and my use case both leverage the ListingTable which is where I'm personally interested in driving performance improvements with tables on "high latency" storage.
Exposing additional metrics around where DataFusion is spending its time at the API level (and in turn through the CLI) does seem very useful to me though. I personally had to rely on a mix of production metrics for our object storage, doing off-cpu-time profiling, and the aforementioned hacked in timing instrumentation, to help me understand that listing files and collecting their object metadata was taking a non-trivial amount of time, especially in hive partitioned contexts. Better metrics should, in theory, eliminate the need for much of that toil.
better diagnose why this command takes so long (and thus how we can make it better0
I don't currently have any true insights (just some educated guesses) as to why table creation is taking so long, but I also have done very little investigation there as of now. I'm personally more interested in improving performance (more query performance than write performance currently) to existing tables and consider the table creation step to effectively be a 1-time cost. I'm happy to share or better clarify any of my current findings. I'm just not entirely sure the best avenue to do so since this is a pretty active project and the core maintainers seem busy already. I can open draft PRs for the couple of POCs I've thrown together, highlight existing areas of code I've done hacky-instrumentation around etc. if it helps further the discussion.
I'm just not entirely sure the best avenue to do so since this is a pretty active project and the core maintainers seem busy already. I can open draft PRs for the couple of POCs I've thrown together, highlight existing areas of code I've done hacky-instrumentation around etc. if it helps further the discussion.
I believe the most valuable think you can do is write up the usecases / file tickets
For example, the CREATE EXTERNAL TABLE overture_maps STORED AS PARQUET LOCATION 's3://overturemaps-us-west-2/release/2025-07-23.0/' is amazing as it is public and people can reproduce the slow table creation (and thus we can actually do something about it)
Perhaps you can file a ticket about "improve the performance of ListingTable creation / CREATE EXTERNAL TABLE" with that usecase? I can do it as well
Then you would have a place to connect any POCs that you have made
Likewise this bug you filed is great
- https://github.com/apache/datafusion/issues/17049
I think we'll be able to sort that out in short order
Perhaps we can take inspiration from @kosiew and #17021 and add some way to monitor what is happening I/O wise from datafusion-cli.
I will try and spec out a ticket for this
- I filed https://github.com/apache/datafusion/issues/17207
I believe the most valuable think you can do is write up the usecases / file tickets
@alamb done, I've filed #17211 which describes my use case and some of my current findings, and referenced this issue as being related so we can better keep track of some of the context. I will note that the ultimate solution to that issue is likely multi-faceted and probably deserves to be split into multiple smaller units of work.
Here is another ideae
- https://github.com/apache/datafusion/issues/18118