duckdb_iceberg icon indicating copy to clipboard operation
duckdb_iceberg copied to clipboard

Predicate Pushdown for scans

Open rustyconover opened this issue 2 years ago • 25 comments

It currently appears that predicate pushdown isn't applied before calculating what Parquet files are included in scanning the table's data.

The AVRO manifest files include statistics about each Parquet file. Those should be leveraged to reduce the number of files that need to be read. Additionally there is the byte offset for each row group in each Parquet file. That can prevent the need to read the footer of the Parquet field and allow better concurrent scanning.

rustyconover avatar Jul 07 '23 03:07 rustyconover

@rustyconover Hey thanks for all the input! All your submitted issues are all very valid points and are definitely on our to-do list in getting this extension beyond its initial proof-of-concept state!

samansmink avatar Jul 17 '23 13:07 samansmink

Thanks Sam I look forward to contributing

On Mon, Jul 17, 2023 at 09:22 Sam Ansmink @.***> wrote:

@rustyconover https://github.com/rustyconover Hey thanks for all the input! All your submitted issues are all very valid points and are definitely on our to-do list in getting this extension beyond its initial proof-of-concept state!

— Reply to this email directly, view it on GitHub https://github.com/duckdblabs/duckdb_iceberg/issues/2#issuecomment-1638134998, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAFSWJJXOMZXPPBO26U2J73XQU4CZANCNFSM6AAAAAA2BHSCTE . You are receiving this because you were mentioned.Message ID: @.***>

rustyconover avatar Jul 17 '23 13:07 rustyconover

How we do this on Iceberg (both Java and Python) is using visitors.

image

When a query is executed against a snapshot, that has a reference to a ManifestList, which references one or more Manifests. The manifests contain information that can be used to prune datafiles.

  • RewriteNot: To simplify the expression, we need to make sure that we rewrite the not predicates. For example, not a = b, will be rewritten to a != b (Python). In PyIceberg this can be done both on a bound and an unbound predicate (binding it to a schema).
  • BindSchema: binds the predicate to the field (Python]. It will lookup the field using the field name.
  • InclusiveMetricsVisitor: We first construct an InclusiveMetricsEvaluator (Java, Python). Because it is inclusive, it will return true if the file may match. There is also a very extensive test-suite from Java that is highly recommended to port as well since we don't want to skip files that may contain relevant data. In Python, we decided to return a function that accepts a manifest_entry (that carries all the statistics, such as value_count, null_count, nan_counts, lower_bound, upper_bound). If this evaluates to MIGHT_MATCH, it is included in the scan planning.

Fokko avatar Jul 18 '23 13:07 Fokko

Hey guys! Just wanted to check in and see if there might be updates on this - i.e., is this still prioritized? I wonder if it might be possible to have an intermediate solution in the meantime, where similar to read_parquet(), a list of files (both data and delete, obtained using PyIceberg say) may be passed to iceberg_scan(), greatly reducing the complexity of the scan.

dev-goyal avatar Jan 06 '24 02:01 dev-goyal

@Fokko can you please update your links to the existing Java and Python implementations since the references have moved when the Iceberg repos were split out?

rustyconover avatar Apr 10 '24 02:04 rustyconover

@rustyconover Certainly, updated!

Fokko avatar Apr 12 '24 19:04 Fokko

Hi @samansmink and @Fokko,

I spent a few hours looking at this extension, DuckDB itself and the Iceberg spec to implement a plan for this. Right now, I want to query Iceberg tables that consist of 100,000+ of parquet files that are highly partitioned, but without DuckDB supporting partitions and data file metrics it will be impossible. So I'm trying to figure out how to make this happen.

Here is what I've noticed:

TableFunction's bind_replace prevents access to query filters.

The use of the bind_replace function for iceberg_scan(), means that the init calls that contain the filter information for the query are never called. The bind_replace function currently read the Iceberg metadata then substitutes itself in the query plan with a call to parquet_scan() or a sub query that applies the delete file and still calls parquet_scan().

I don't see a way to get access to the DuckDB query filters since they are passed in the "init" phase of a TableFunction rather than the bind stage. Once the function has been changed to a parquet_scan(), it appears the init function of the iceberg_scan() is never called.

@samansmink What do you think is a reasonable approach to enable iceberg_scan() to enable access to the DuckDB query filter's structure? If we have access to the requested filters at the same time we are reading the manifest and determining the list of Parquet files to scan, we could keep using parquet_scan(). @Mytherin did I overlook the ability to obtain the query filters in the context of the bind function of a TableFunction?

A thought I had would be populating the metrics from the Iceberg manifests into the existing statistics infrastructure of parquet_scan(), but that would require a lot of reworking and could lead to a lot of memory usage.

Manifest metrics and partition information

It appears the manifest metrics and partition information aren't being parsed from the Avro files. This seems to just require some work to the generated code for parsing the Avro files. This can happen in parallel to the other improvements.

rustyconover avatar Apr 13 '24 15:04 rustyconover

Seems like some progress for the necessary DuckDB infrastructure is being started here: https://github.com/duckdb/duckdb/pull/11806

rustyconover avatar Apr 24 '24 12:04 rustyconover

@rustyconover: AFAIK the filter-pushdown with iceberg gets triggered.

Note pushdown happens

  • in bind (the "complex-filter" mechanism for hive-partitioned file collections) and
  • during query optimization (pushdown of (non)equality and range predicates).

And note that immediately after bind_replace, the generated replaced piece of query plan is bound. So then, hive-partitioned file pruning based on pushdown happens.

However, when iceberg support was initially released, the second mechanism (in the query optimization) did suffer from absence of a pushdown rewrite-rule for ANTI-joins. Note that bind_replace generates an anti-join between the main data files and the files with deleted row-ids. But, late 2023 these rules were implemented, so this should now work.

peterboncz avatar May 21 '24 22:05 peterboncz

Hi @peterboncz,

Thanks for replying.

The SQL query filter pushdown does get passed to the parquet_scan() nodes created by iceberg_scan(). To apply the query filters, DuckDB reads the footers from each Parquet file.

When an Iceberg table has millions of files, reading all these footers takes a long time. Iceberg addressed this issue by adding column metrics in the manifest files and detailed "Scan Planning" in their spec:

https://iceberg.apache.org/spec/#scan-planning

I'm looking forward to DuckDB using Iceberg's statistics to plan the scan of the table, right now the current code includes every file in the manifest. By reading the manifest files and using their info to plan a scan, scanning an Iceberg table with a million files can be reduced to scanning just 1 or 2 files especially if the query is referencing a small number of partition predicates.

rustyconover avatar May 22 '24 19:05 rustyconover

Thanks for chiming in here @peterboncz. I believe @rustyconover is right here. https://github.com/duckdb/duckdb_iceberg/issues/8 focuses on the partitioning (called hidden-partitioning in Iceberg that's slightly different than Hive-style partitioning).

Next to that, Iceberg had on a file-level basis additional statistics. For example, there are upper- and lower-bounds to easily skip files without having to read the footer of the Parquet file. But I don't see the upper_bound field being referenced in the code: https://github.com/search?q=repo%3Aduckdb%2Fduckdb_iceberg+upper_bound&type=code

Fokko avatar May 27 '24 21:05 Fokko

Hey @rustyconover it looks like the blocker you mentioned: https://github.com/duckdb/duckdb/pull/11806 got merged. Do you think the current status ready for implementation?

JichaoS avatar Jul 11 '24 20:07 JichaoS

@rustyconover are you planning to pick this up? thanks

febinsathar avatar Sep 30 '24 12:09 febinsathar

Not in the short term.

rustyconover avatar Sep 30 '24 12:09 rustyconover

@febinsathar I have a version partially working here

mike-luabase avatar Oct 02 '24 15:10 mike-luabase

https://github.com/duckdb/duckdb_iceberg/pull/72 finally will this get merged ? @mike-luabase or any blocker left

febinct avatar Dec 05 '24 10:12 febinct

@mike-luabase, any update on this? Would be very nice to have this functionality.

marco-t-314 avatar Feb 21 '25 11:02 marco-t-314

@marco-t-314 We are currently working on https://github.com/duckdb/duckdb-iceberg/pull/101, which is the first step towards our desired path towards predicate pushdown. After moving to the multifilereader API with that PR, we will look into implementing predicate pushdown

samansmink avatar Feb 21 '25 11:02 samansmink

is this being worked on? is there an update? eta?

rajreddy avatar Mar 24 '25 12:03 rajreddy

it is a high prio issue that is being worked on, but I can't really give an accurate ETA. Next step is to get https://github.com/duckdb/duckdb-iceberg/pull/110 through. Then once we can cleanly parse the metadata we will start with the predicate pushdown.

samansmink avatar Mar 24 '25 12:03 samansmink

Now that #110 is done, what would be the next steps?

joaocrebelo avatar Apr 07 '25 18:04 joaocrebelo

With #212 the state of the implementation is now:

Scan predicates of the query are applied to the manifest_file to determine if the scan predicate can not be satisfied by the (contents of the) manifest. This is currently only done for partition fields with the identity transform, for filters that are either ConstantFilter or ConjunctionAndFilter.

That leaves a lot still to be implemented, but those further improvements should be more manageable now that the groundwork is laid down.

Tishj avatar May 02 '25 13:05 Tishj

@Tishj Thank you pushing the code to handle partition fields with the identity transform. I am primarily using DayTransform any idea when this could be rolled out.

  1. I am interested in working on this. Any help or reference to point will be really helpful.
  2. What could be the potential changes that needs to be done from the current code of IdentityTransform to handle DayTransform.

Vasampa23 avatar May 09 '25 06:05 Vasampa23

@Tishj Thank you pushing the code to handle partition fields with the identity transform. I am primarily using DayTransform any idea when this could be rolled out.

  1. I am interested in working on this. Any help or reference to point will be really helpful.
  2. What could be the potential changes that needs to be done from the current code of IdentityTransform to handle DayTransform.

I don't have an answer, because those are exactly the things I would investigate if I started working on this. From the spec I gathered that non-identity transforms have to be more forgiving because the transformation from the scan-predicate to the partition-predicate (such as applying the "day" transform) is a truncating action, see https://iceberg.apache.org/spec/#scan-planning

Tishj avatar May 10 '25 08:05 Tishj

@Vasampa23 see https://github.com/duckdb/duckdb-iceberg/pull/229, this should make it possible to add day transforms, the main thing left to do there is add tests (see https://github.com/duckdb/duckdb-iceberg/pull/206 on how tests are preferably added)

If it's not possible to make a generated-table-test, the data should be placed in data/persistent/<test name> and the script(s) used to generate the persistent data should be placed in scripts/persistent/<test name>

Tishj avatar May 10 '25 10:05 Tishj