[Feature Request] Optimize common case: SELECT COUNT(*) FROM Table
Feature request
Running the query "SELECT COUNT(*) FROM Table" should read only Delta logs
Overview
Running the query "SELECT COUNT(*) FROM Table" takes a lot of time for big tables, Spark scan all the parquet files just to return the number of rows, that information is available from Delta Logs.
The same for "SELECT COUNT(*) FROM Table Group BY PartitionColumn"
Motivation
Huge performance overhead.
Willingness to contribute
The Delta Lake Community encourages new feature contributions. Would you or another member of your organization be willing to contribute an implementation of this feature?
- [ ] Yes. I can contribute this feature independently.
- [X] Yes. I would be willing to contribute this feature with guidance from the Delta Lake community.
- [ ] No. I cannot contribute this feature at this time.
@zsxwing, you assigned @vkorukanti, it means you plan to implement it?
@felipepessoto No, I am not planning to implement. @zsxwing assigned it to me to look at it and provide any guidance. We are currently busy with the next release of Delta, will get back to you after the release? I will assign this issue back to you, given you are interested in implementing it.
@vkorukanti do you have any example of code where the query plan is replaced by a optimized version? I think it would be a good start point.
I can confirm that this bug exists and it is impacting me as well. The strange thing is: we have a large bronze table and an even larger silver table, and counting the data by partition is very fast in silver (it reads the metadata), but extremely slow in bronze (it scans all the files). Not sure what is happening here, but if some of the maintainers could provide some guidance on how to implement it I would be willing to help out as well.
Hi @vkorukanti, I'm doing some experiments and I have two different approaches (it is very high level only, I not sure if they are feasible), I'd like to hear your opinion on it.
First:
- Change the DeltaCatalog.loadTable https://github.com/delta-io/delta/blob/ab0946e739e6e4dece26e2a675ad452c8d33f8be/core/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaCatalog.scala#L176 to fill catalog stats.
- Later use the statistics to rewrite the query plan, replacing the parquet file scan to a no-op, and return the count.
Second:
- In DataSkippingReader, change filesForScan to skip files if the query is a SELECT COUNT and use stats only.
- The pros of doing this by file is I can also use it when the COUNT is by PartitionKey. But I'm not sure how to get the query plan here.
I started working on option #1. Have a PoC working
@felipepessoto - awesome! Can you post a PR?
@scottsand-db, PR is published: https://github.com/delta-io/delta/pull/1377
Thanks for working on this @felipepessoto. Is there any chance of subsequently expanding this to work when there are filters on partition columns.
@Tom-Newton yes, I plan to continue improving it. I'd like to first finish this first step to have the foundation, and then expand the scenarios.