Plan to replace `SchemaAdapter` with `PhysicalExprAdapter`
As discussed in https://github.com/apache/datafusion/pull/16791 the long term plan in my mind (and that I would like to discuss with the community) is to replace SchemaAdapter with PhysicalExprAdapter.
There are multiple reasons for this:
- We can better optimize scenarios like missing columns or casts. For example, it's cheaper to cast a literal and evaluate it against the data as read from the file than it is to read the data from the file and cast that to the type of the literal. It is also cheaper to evaluate the expression
1 > col1as1 > nullwhencol1is missing than it is to create an array of nulls. Since we can also simplifyPhysicalExprwe can even simplify1 > nullinto justnull. - It's easier to manipulate
PhysicalExprs than it is to manipulate arrays. We already have machinery (TreeNodeAPIs, etc.) to do so. - This is necessary to be able to push down projections into file scans which we need for upcoming Variant work and will also allow us to read single fields in a struct without reading the entire struct into memory. Basically if we want to be able to customize how expressions are evaluated for a specific format, in particular how
variant_get(column, 'field')orget_field(column, 'field')are executed in the context of a specific format (e.g. in parquet we can read single struct columns or use shredded variant) we need to have access to the expression in ParquetOpener in order to check if the file schema has the shredded variant field and generate the right ProjectionMask. - Paves the path for any other advanced optimizations, e.g. we could do crazy stuff like only read the dictionary page from a parquet column for a filter
col = 'a'and if'a'is not in the dictionary don't even bother reading the keys.
We've already implemented a replacement system for predicate pushdown via PhysicalExprAdapter and have examples showing how to do some of the things a custom SchemaAdapter can do.
Once we implement https://github.com/apache/datafusion/issues/14993 we'll be able to deprecate SchemaAdapter for the most part.
Comet makes increasing use of SchemaAdapter, but nothing you describe here sounds like a dealbreaker for Comet at first glance. I think we'd be able to make the necessary changes thin the Comet repo, and likely benefit from all of the stuff you're proposing from this change.
We'll be able to start making the migration with the DF 49.0 release?
Thank you for chiming in!
We'll be able to start making the migration with the DF 49.0 release?
Yes that's the plan. I'm trying to figure out the way to make the migration as incremental and painless as possible. We'll also have to be doing this migration. The current plan as outlined in https://github.com/apache/datafusion/pull/16791 is that by default if you don't provide anything you use the new system. If you have a custom SchemaAdapter it will be used for everything. You can start migrating by implementing a PhysicalExprRewriter that has 1:1 pairity with your SchemaAdapter implementation and passing that in as well as your SchemaAdapter, then the SchemaAdapter will be used for projections and the PhysicalExprRewriter will be used for predicate pushdown.
Once we implement projection pushdown without any changes to your code you'd be fully migrated to the new system and can just remove the SchemaAdapter.
I came across this thread via @alamb’s post on dev and was curious to learn more. While I understand the general idea of what’s being proposed, it’s still unclear to me how it will work in practice or what steps are involved.
Could you provide a brief overview of the typical usage of SchemaAdapter, and explain how you plan to replace it with PhysicalExprAdapter? A few concrete examples would also be really helpful.
Here's an example from Comet: https://github.com/vaibhawvipul/datafusion-comet/blob/main/native/core/src/parquet/schema_adapter.rs. As you can see it's a lot of code with a lot of duplication. The change will essentially be to re-implement that in terms of PhysicalExprAdapter which I think should be a lot less code and duplication. Maybe something like this (very much pseudocode):
/// A cast kernel that respect's Spark's casting rules.
pub struct SparkCast {
/// The expression to cast
pub expr: Arc<dyn PhysicalExpr>,
/// The data type to cast to
cast_type: DataType,
/// Cast options
cast_options: CastOptions<'static>,
}
impl PhysicalExpr for CastExpr {
...
fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
let value = self.expr.evaluate(batch)?;
spark_parquet_convert(value, &self.cast_type, Some(&self.cast_options))
}
...
}
struct SparkPhysicalExprAdapter {
inner: DefaultPhysicalExprAdapter,
}
impl PhysicalExprAdapter for SparkPhysicalExprAdapter {
fn rewrite(&self, mut expr: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn PhysicalExpr>> {
expr = self.inner.rewrite(expr);
expr.transform(|expr| {
if let Some(cast) = expr.as_any().downcast_ref::<CastExpr>() {
return Ok(Transformed::yes(SparkCastExpr::from(cast))
}
Ok(Transformed::no(expr))
}).data()
}
}
Here's some more examples: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/json_shredding.rs, https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/default_column_values.rs
I think an example with custom casting rules like https://github.com/apache/datafusion/issues/16800#issuecomment-3080077103 would be helpful.
I feel it may be a fair amount of work in Comet to move from SchemaAdapter to PhysicalExprAdapter but from the pseudocode example it appears tractable. I think we'll be willing to take the hit if the promise is improved predicate pushdown performance. :)
Having looked at your implementation I think it may not be that bad! It seems like most of what your SchemaAdapter is doing is customizing casting rules, right?
Having looked at your implementation I think it may not be that bad! It seems like most of what your SchemaAdapter is doing is customizing casting rules, right?
All of it is customized casting rules :)
@parthchandra @mbutrovich please take a look at https://github.com/apache/datafusion/pull/16803.
As per the comments in the example it looks like Comet already has a custom PhysicalExpr for casting so implementing a SchemaAdapter replacement might be <100 LOC because you can re-use your existing kernel / logic.
So the basic idea, as I understand it, is that instead of adapting the data batch using SchemaAdapter against the schema, the new approach involves rewriting or transforming the physical expressions using PhysicalExprAdapter.
There are multiple reasons for this:
Based on this reasoning, I think the first two points make sense.
Could you clarify the latter two? From your description, they sound like areas where PhysicalExprAdapter could bring benefits — but I'm not quite sure how SchemaAdapter fits into those cases. Are these scenarios where the current use of SchemaAdapter is actually blocking progress toward those goals, and replacing it with PhysicalExprAdapter is part of the solution?
Could you clarify the latter two? From your description, they sound like areas where
PhysicalExprAdaptercould bring benefits — but I'm not quite sure howSchemaAdapterfits into those cases.
There's some discussion in https://github.com/apache/datafusion/issues/14993. Basically if we want to be able to customize how expressions are evaluated for a specific format, in particular how variant_get(column, 'field') or get_field(column, 'field') are executed in the context of a specific format (e.g. in parquet we can read single struct columns or use shredded variant) we need to have access to the expression in ParquetOpener in order to check if the file schema has the shredded variant field and generate the right ProjectionMask.
There's some discussion in #14993. Basically if we want to be able to customize how expressions are evaluated for a specific format, in particular how
variant_get(column, 'field')orget_field(column, 'field')are executed in the context of a specific format (e.g. in parquet we can read single struct columns or use shredded variant) we need to have access to the expression in ParquetOpener in order to check if the file schema has the shredded variant field and generate the right ProjectionMask.
Thanks for bringing this up — that's a great point. It might be a good idea to update the issue description with this key information to make it easier for others who aren’t as familiar with the background to follow along.
Just to share some general thoughts — this isn't directly related to the change itself.
Sometimes, I feel that some important proposals in DataFusion lack sufficient context, or that the relevant context is scattered across various issues and PR comments. This makes it difficult to fully understand the proposals or to trace their motivations and evaluate their soundness. As a result, we sometimes see large PRs — hundreds or even thousands of lines — that are based on these proposals, making the review process even more challenging. Only the author or those who were involved in the initial discussions seem to be in a position to effectively review them.
For example, Spark has the SPIP (Spark Project Improvement Proposal) mechanism, where contributors submit formal documents for review when proposing significant changes. These documents typically consolidate the technical details, motivation, and background of the proposal into a single place. This approach helps the community better understand and participate in discussions around major changes.
I wonder if it would be beneficial for DataFusion to adopt a similar lightweight proposal process for major design changes — something that allows ideas and context to be collected and reviewed before implementation begins. It could help improve transparency, facilitate broader community involvement, and make the review process more accessible.
If the full SPIP process — including voting and formal approval — feels too heavy or unnecessary for our context, perhaps we could at least establish a lightweight template for major change proposals. This template could include sections for motivation, background, technical details, and other relevant context. Having a consistent format would make it easier for the community to follow and engage with significant design discussions.
I wonder if it would be beneficial for DataFusion to adopt a similar lightweight proposal process for major design changes —
I think it would be super helpful. Thank you for this suggestion @viirya
This proposal itself is a good example (if we don't find a way to publicize the idea of using proposals it will likely be lost on in this ticket about a tangentially related matter).
Shall we file (another) ticket to discuss the process?
One way we could proceed is to document some rough guidelines in the docs site, and then maybe add a label we can use to tag issues with proposals, so the current list is easier for people to find.
One way we could proceed is to document some rough guidelines in the docs site, and then maybe add a label we can use to tag issues with proposals, so the current list is easier for people to find.
I like the idea of tagging issues. It formalizes and organizes an existing process instead of introducing new ones. Then it's just up to us to suggest / require a proposal / tracking issue for large scale changes.
Shall we file (another) ticket to discuss the process?
Yea, as it is not directly related to this change.
One way we could proceed is to document some rough guidelines in the docs site, and then maybe add a label we can use to tag issues with proposals, so the current list is easier for people to find.
Sounds good to me. In Spark, SPIPs are conducted through the dev mailing list. But since the DataFusion community seems more accustomed to using GitHub issues for discussions, it might be a good idea to adopt that approach instead.
Just wondering — is it possible in GitHub to enforce a specific issue format based on the label? For example, could we require a certain template or structure for issues labeled as "design-proposal"?
Shall we file (another) ticket to discuss the process?
I filed one:
- https://github.com/apache/datafusion/issues/16886
Just wondering — is it possible in GitHub to enforce a specific issue format based on the label? For example, could we require a certain template or structure for issues labeled as "design-proposal"
I don't think we can require a particular format, but we can Add a new ISSUE_TEMPLATE to help with the process
Just going to quote myself from https://github.com/apache/datafusion-comet/issues/2058 to see if I'm misunderstanding the current behavior as we try to migrate Comet to PhysicalExprAdapter:
I've run into some issues with this. In particular, I don't see a way to handle default values that don't have a filter on them:
https://github.com/apache/datafusion/blob/40015a8345c6c20f439cb3829897d20ebce70cc9/datafusion/datasource-parquet/src/opener.rs#L256
Even the example says that default values without a filter on them should rely on the
SchemaAdapterbut we're seeing a ton of WARN messages that the API is going away.https://github.com/apache/datafusion/blob/6d0073465a59d743e42e6fdba690e40ced8a792a/datafusion-examples/examples/default_column_values.rs#L67
Thanks for any guidance you can provide!
@mbutrovich by "I don't see a way to handle default values that don't have a filter on them" I assume you're talking about projections e.g. select column_that_may_be_missing_from_some_files from t?
The answer to that is that we are working on migrating that to use PhysicalExprAdapter but we're not there yet. Work is roughly tracked in https://github.com/apache/datafusion/issues/14993
Even the example says that default values without a filter on them should rely on the SchemaAdapter but we're seeing a ton of WARN messages that the API is going away.
Terribly sorry about the warn messages. They were too aggressive / too soon. We've now removed it: https://github.com/apache/datafusion/pull/16968.
Also wanted to mention we now have an example of how default values could be implemented with PhysicalExprAdapter: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/default_column_values.rs
@mbutrovich by "I don't see a way to handle default values that don't have a filter on them" I assume you're talking about projections e.g.
select column_that_may_be_missing_from_some_files from t?The answer to that is that we are working on migrating that to use
PhysicalExprAdapterbut we're not there yet. Work is roughly tracked in #14993
Yep, that's exactly where I'm at right now. Thanks for clarifying! I'll punt on our PhysicalExprAdapter migration for now, but will track the DF issue and contribute where I can!
Thanks again!
https://github.com/apache/datafusion/pull/16791#discussion_r2211280296https://github.com/apache/datafusion/pull/16791#discussion_r2211280296
Is there any progress or plan in the next releases for an alternative for projections?
Yes we are making progress but it's been a very hard transition to compartmentalize into reasonable changes. We are tracking it here https://github.com/apache/datafusion/issues/14993
As of https://github.com/apache/datafusion/pull/19111 Parquet is fully migrated 🚀
I think we just have to do the rest of the data sources now, update all tests, etc. then we can actually deprecate SchemaAdapter
I'm hoping to bump Comet to 51 this week, is there enough in 51 to resume the migration or should we wait to try until 52?
I think you can resume the migration. Parquet is fully migrated, which is good both if Parquet is your primary use case or as an example for custom data sources to follow. The TODOs left are to migrate other built in data sources and officially deprecate SchemaAdapter.