datafusion icon indicating copy to clipboard operation
datafusion copied to clipboard

Evaluate filter pushdown against the physical schema for performance and correctness

Open adriangb opened this issue 7 months ago • 14 comments

Describe the bug

Consider the following test:

COPY  (
  SELECT arrow_cast(a, 'Int16') AS a
  FROM ( VALUES (1), (2), (3) ) AS t(a)
)  TO 'test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet'
STORED AS PARQUET;

set datafusion.execution.parquet.pushdown_filters = true;

CREATE EXTERNAL TABLE t_pushdown(a int) STORED AS PARQUET
LOCATION 'test_files/scratch/parquet_filter_pushdown/parquet_table/';

select * from t_pushdown where a = arrow_cast(2, 'Int8');

At some point DataFusion optimizes the Int8 filter by casting the filter to Int32 (matching the table schema, thus avoiding having to cast the column).

So when the filter gets into ParquetSource it's an Int32 filter. But when we read the file schema it's actually an Int8! Since we now build pruning predicates, etc. on a per-file basis using the physical file schema this can introduce casting of the data from Int8 to Int32 which is unnecessary because (1) we could cast the filter instead which would be much cheaper and (2) if the file type and filter type were both Int8 or Int16 in this example (as might happen if one changes the table schema but not old data or old queries) we would actually be closer to the original intent of the query.

To be clear, I do not mean that this is a new regression. I believe this has always been the case but now we can actually fix it and before we could not.

This applies not only to stats filtering (where the impact is likely negligible) but also to predicate pushdown where I expect the impact may be much larger especially for cases where we never end up materializing the columns (and thus don't have to cast them to the table's data type at all). I don't know that any benchmark measures this case at the moment though.

To resolve this I think we just need to call optimize_casts(physical_expr, physical_file_schema) (a made up function) but I don't know where or howoptimize_casts exists (I feel like it must already exist, maybe it's at the logical expr level?). Does anyone know where this exists?

adriangb avatar Apr 20 '25 02:04 adriangb

I would love to work on this task

james-ryans avatar Apr 20 '25 03:04 james-ryans

I can confirm this is currently being done at the LogicalPlan level. I'd say the first step is to understand how it happens there and then if something similar exists for PhysicalExpr and if it doesn't create it.

adriangb avatar Apr 20 '25 04:04 adriangb

To understand how this happens in the logical optimizer, as part of the SimplifyExpressions pass, you can look at unwrap_cast.rs.

leoyvens avatar Apr 23 '25 15:04 leoyvens

I think the first thing to do would be to try and write some tests that show the error happening

Perhaps we could use the existing statistics: predicate_evaluation_errors https://github.com/apache/datafusion/blob/6d5e00ad3f8e53f7252cb1d3c72a6c7f28c1aed6/datafusion/datasource-parquet/src/metrics.rs#L31-L30

Here is an example test that shows how to use those statistics: https://github.com/search?q=repo%3Aapache%2Fdatafusion%20predicate_evaluation_errors&type=code

https://github.com/apache/datafusion/blob/555fc2e24dd669e44ac23a9a1d8406f4ac58a9ed/datafusion/core/tests/parquet/row_group_pruning.rs#L245

https://github.com/apache/datafusion/blob/555fc2e24dd669e44ac23a9a1d8406f4ac58a9ed/datafusion/core/tests/parquet/page_pruning.rs#L253

alamb avatar May 05 '25 20:05 alamb

@alamb there is no error AFAIK. It currently works, but it works by casting the data to match the types of the table. The point I’m making is that we could instead cast the expression to match the type of the file, possibly saving a lot of copying / blowing up dictionaries.

adriangb avatar May 18 '25 19:05 adriangb

As discussed a bit in https://github.com/apache/datafusion/pull/16086#discussion_r2100826502 there is a fundamental problem that all of the predicates are planned at the table level.

So for example the predicate func(col1) where at the table / logical level the types are col1: Utf8View. If the file types are col1: Uf8View then how do we know that func(Utf8View) won't error? There's other cases... basically it's not clear to me that it's trivial to get there / I worry we have to re-invent casting rules at the physical layer. I remember @findepi said somewhere something along the lines of "casting should not be changed after planning" (please correct me or feel free to chime in)

My feeling is that we eventually have to do this for performance / reducing extra work, to add important new features and for correctness reasons, but evidently my initial attempt was too naive so we had to revert it. I am willing to try again later, especially that we now have some tests in place (we need more, e.g. for the example with a function above).

adriangb avatar May 21 '25 18:05 adriangb

"casting should not be changed after planning"

if i said exactly that I should stand corrected. coercions is somewhat that should be applied during analysis/initial planning phase. Coercion rules result in casts being inserted into the plan. After the initial plan is fully formed, the word "coercion" does not exist anymore.

The casts are same category as function calls -- the optimizer may reorganize or replace function calls with other expressions as long as they are equivalent (and are believed to "be better"). Casts can be removed or replaced the same way (again: as long as the resulting expression is well formed and equivalent).

from the issue description:

So when the filter gets into ParquetSource it's an Int32 filter. But when we read the file schema it's actually an Int8!

Where does Int8 come back?

Anyway, as the example shows, two different files may have two different internal representation for the same SQL-level column. I.e. the table may declare Int64, but the file may contain Int32 or Int16. (This is not limited to various Int bitnesses). The Parquet source which deals with individual files may perform similar logic to unwrap_cast optimizer. Does it matter though?

findepi avatar May 22 '25 19:05 findepi

"casting should not be changed after planning"

if i said exactly that I should stand corrected. coercions is somewhat that should be applied during analysis/initial planning phase. Coercion rules result in casts being inserted into the plan. After the initial plan is fully formed, the word "coercion" does not exist anymore.

The casts are same category as function calls -- the optimizer may reorganize or replace function calls with other expressions as long as they are equivalent (and are believed to "be better"). Casts can be removed or replaced the same way (again: as long as the resulting expression is well formed and equivalent).

Thanks for correcting me! That's the sort of distinction I knew you'd be able to make that I was lacking. It's a helpful way to think about it

from the issue description:

So when the filter gets into ParquetSource it's an Int32 filter. But when we read the file schema it's actually an Int8!

Where does Int8 come back? Anyway, as the example shows, two different files may have two different internal representation for the same SQL-level column. I.e. the table may declare Int64, but the file may contain Int32 or Int16. (This is not limited to various Int bitnesses). The Parquet source which deals with individual files may perform similar logic to unwrap_cast optimizer. Does it matter though?

That's the point: we need to do similar logic to unwrap_cast, which is non trivial I think. I started down that road and got confused about some cases so I backed out. For example, if you have a table schema of col1: Int64, col2: Int64 and a predicate col1 = col2 there will be no casts at the logical level. But when you get to the file level you have the schema col1: Int8, col2: UInt32, now you have to do something more similar to coercion I think (i.e. introduce some casts)? What would you suggest we do in this case?

Basically I think we need to all agree that this complexity is the right way to go and then agree on what to do in the different scenarios.

adriangb avatar May 22 '25 20:05 adriangb

It's very natural to think about file-level vs table level as same thing as SQL coercions, but there is an important distinction. SQL has its own semantics and table provider has its own semantics. Making this distinction is easier to understand in systems where it's not Arrow everywhere and SQL side and table provider side are cleanly delimitated by their different type systems.

From Parquet to table level -- the semantic of this operation is defined by a read. What happens if file has col1: Int8 but the table defines it as Int32? Well, nothing unusual, Int8 is extended to Int32 (infallibly). There is "a cast" (an equivalent of SQL cast) happening inside the table provider. If a query comes with a filter (in Int32 terms), the filter may be translated to col1 by equivalent of the unwrap cast optimization (yes, separate code).

findepi avatar May 22 '25 20:05 findepi

If a query comes with a filter (in Int32 terms), the filter may be translated to col1 by equivalent of the unwrap cast optimization (yes, separate code).

What about in the situation described above? What happens now is basically that both columns get cast to Int32 to match their table level types. But that's at least one extra cast. And there's cases like structs with extra fields where I don't think it makes sense to do any cast: if the file type has extra fields it's a compatible with any function that operates on the table type that has a subset of the fields.

adriangb avatar May 22 '25 21:05 adriangb

What about in the situation described above? What happens now is basically that both columns get cast to Int32 to match their table level types. But that's at least one extra cast. And there's cases like structs with extra fields where I don't think it makes sense to do any cast: if the file type has extra fields it's a compatible with any function that operates on the table type that has a subset of the fields.

I predict that the 'extra fields' usecase is going to be the one that will be the most important and will drive this feature (as perfomance will be disasterous without it)

alamb avatar May 23 '25 18:05 alamb

@alamb I tried to put together an example of schema evolution where the file had a Int32 column at the file schema level and the table has it as Int64, I can see the extra conversion happening if I but prints in the right place but I could not get it to show a meaningful difference in performance, I'm guessing because most of the time and variability comes from reading the data, parsing parquet, etc. and once it's in memory as arrow converting from Int32 to Int64 is trivial.

This did make me notice: we pretty much already have a special case of what I am proposing here: https://github.com/apache/datafusion/blob/5a2933e5878777c75c931b42327b1074bcd43d35/datafusion/datasource-parquet/src/opener.rs#L221-L234

It only deals with the View types, which I think are a special case of the general idea.

The general idea is to look at the filters and projection and decide what the most efficient way to marry up the data is. Currently it is always converting the data to match the filter / table field types after it has been read from parquet. What I am thinking is:

  1. For any cases where we can read from parquet directly into the table type do that (e.g. StringView and Utf8 are stored the same in Parquet, both UInt8 and UInt16 are stored as INT32 in Parquet). This is basically what is happening with the view types already.
  2. If a cast is required prefer to cast the filter / scalar values or the smaller column (we can use similar logic as what we use to reorder filters).

adriangb avatar Jun 13 '25 21:06 adriangb

variability comes from reading the data, parsing parquet, etc. and once it's in memory as arrow converting from Int32 to Int64 is tri

This makes a lot of sense to me

It only deals with the View types, which I think are a special case of the general idea.

I agree -- this is a nice observation

What I am thinking is:

For any cases where we can read from parquet directly into the table type do that (e.g. StringView and Utf8 are stored the sa me in Parquet, both UInt8 and UInt16 are stored as INT32 in Parquet). This is basically what is happening with the view types already. If a cast is required prefer to cast the filter / scalar values or the smaller column (we can use similar logic as what we use to reorder filters).

This also makes sense to me

I still think the major / critical usecase is going to be "extract a field from this struct / variant" but anything else we do will be bonus

alamb avatar Jun 16 '25 10:06 alamb

Thinking out loud about some of the tricky bits: there's going to be cases where we necessarily need to convert to the table schema's data type, e.g. opaque_udf(col): we can't possibly know what opaque_udf accepts or not, the only reasonable thing to do is preserve the current behavior and cast col to the data type of col in the table schema.

So as we design something I think it's probably best to make it default to the current behavior / cast everything to the table schema and optimize specific patterns that we know are safe (e.g. binary expressions).

adriangb avatar Jun 17 '25 03:06 adriangb

This was fixed / implemented in #16461

adriangb avatar Jun 27 '25 17:06 adriangb