datafusion icon indicating copy to clipboard operation
datafusion copied to clipboard

Error with union and optimize_projections: Physical input schema should be the same as the one converted from logical input schema

Open erratic-pattern opened this issue 1 month ago • 7 comments

Describe the bug

DataFusion fails with schema mismatch error when processing UNION ALL query on parquet files with field metadata.

Error while planning query: Internal error: Physical input schema should be the same as the one converted from logical input schema. Differences: .

To Reproduce

Parquet: union_all_repo.zip

Query:

SET datafusion.execution.parquet.skip_metadata = false;

SELECT AVG(usage_idle), AVG(usage_system)
FROM (
    SELECT time, usage_idle, NULL::DOUBLE as usage_system FROM 'union_all_repro.parquet'
    UNION ALL
    SELECT time, NULL::DOUBLE as usage_idle, usage_system FROM 'union_all_repro.parquet'
);

Expected behavior

Query should succeed with matching logical and physical schema

Additional context

Sequence of Events

1. Projection Optimization

optimize_projections sees that time column is unused in the projection and removes it from the UnionExec

+------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type                                                  | plan                                                                                                                                                                                |
+------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| initial_logical_plan                                       | Projection: avg(usage_idle), avg(usage_system)                                                                                                                                      |
|                                                            |   Aggregate: groupBy=[[]], aggr=[[avg(usage_idle), avg(usage_system)]]                                                                                                              |
|                                                            |     Union                                                                                                                                                                           |
|                                                            |       Projection: ./union_all_repro.parquet.time, ./union_all_repro.parquet.usage_idle, CAST(NULL AS Float64) AS usage_system                                                       |
|                                                            |         TableScan: ./union_all_repro.parquet                                                                                                                                        |
|                                                            |       Projection: ./union_all_repro.parquet.time, CAST(NULL AS Float64) AS usage_idle, ./union_all_repro.parquet.usage_system                                                       |
|                                                            |         TableScan: ./union_all_repro.parquet                                                                                                                                        |
| logical_plan after resolve_grouping_function               | SAME TEXT AS ABOVE                                                                                                                                                                  |
| logical_plan after type_coercion                           | SAME TEXT AS ABOVE                                                                                                                                                                  |
| analyzed_logical_plan                                      | SAME TEXT AS ABOVE                                                                                                                                                                  |
| logical_plan after eliminate_nested_union                  | SAME TEXT AS ABOVE                                                                                                                                                                  |
| logical_plan after simplify_expressions                    | Projection: avg(usage_idle), avg(usage_system)                                                                                                                                      |
|                                                            |   Aggregate: groupBy=[[]], aggr=[[avg(usage_idle), avg(usage_system)]]                                                                                                              |
|                                                            |     Union                                                                                                                                                                           |
|                                                            |       Projection: ./union_all_repro.parquet.time, ./union_all_repro.parquet.usage_idle, Float64(NULL) AS usage_system                                                               |
|                                                            |         TableScan: ./union_all_repro.parquet                                                                                                                                        |
|                                                            |       Projection: ./union_all_repro.parquet.time, Float64(NULL) AS usage_idle, ./union_all_repro.parquet.usage_system                                                               |
|                                                            |         TableScan: ./union_all_repro.parquet                                                                                                                                        |
| logical_plan after replace_distinct_aggregate              | SAME TEXT AS ABOVE                                                                                                                                                                  |
| logical_plan after eliminate_join                          | SAME TEXT AS ABOVE                                                                                                                                                                  |
| logical_plan after decorrelate_predicate_subquery          | SAME TEXT AS ABOVE                                                                                                                                                                  |
| logical_plan after scalar_subquery_to_join                 | SAME TEXT AS ABOVE                                                                                                                                                                  |
| logical_plan after decorrelate_lateral_join                | SAME TEXT AS ABOVE                                                                                                                                                                  |
| logical_plan after extract_equijoin_predicate              | SAME TEXT AS ABOVE                                                                                                                                                                  |
| logical_plan after eliminate_duplicated_expr               | SAME TEXT AS ABOVE                                                                                                                                                                  |
| logical_plan after eliminate_filter                        | SAME TEXT AS ABOVE                                                                                                                                                                  |
| logical_plan after eliminate_cross_join                    | SAME TEXT AS ABOVE                                                                                                                                                                  |
| logical_plan after eliminate_limit                         | SAME TEXT AS ABOVE                                                                                                                                                                  |
| logical_plan after propagate_empty_relation                | SAME TEXT AS ABOVE                                                                                                                                                                  |
| logical_plan after eliminate_one_union                     | SAME TEXT AS ABOVE                                                                                                                                                                  |
| logical_plan after filter_null_join_keys                   | SAME TEXT AS ABOVE                                                                                                                                                                  |
| logical_plan after eliminate_outer_join                    | SAME TEXT AS ABOVE                                                                                                                                                                  |
| logical_plan after push_down_limit                         | SAME TEXT AS ABOVE                                                                                                                                                                  |
| logical_plan after push_down_filter                        | SAME TEXT AS ABOVE                                                                                                                                                                  |
| logical_plan after single_distinct_aggregation_to_group_by | SAME TEXT AS ABOVE                                                                                                                                                                  |
| logical_plan after eliminate_group_by_constant             | SAME TEXT AS ABOVE                                                                                                                                                                  |
| logical_plan after common_sub_expression_eliminate         | SAME TEXT AS ABOVE                                                                                                                                                                  |
| logical_plan after optimize_projections                    | Aggregate: groupBy=[[]], aggr=[[avg(usage_idle), avg(usage_system)]]                                                                                                                |
|                                                            |   Union                                                                                                                                                                             |
|                                                            |     Projection: ./union_all_repro.parquet.usage_idle, Float64(NULL) AS usage_system                                                                                                 |
|                                                            |       TableScan: ./union_all_repro.parquet projection=[usage_idle]                                                                                                                  |
|                                                            |     Projection: Float64(NULL) AS usage_idle, ./union_all_repro.parquet.usage_system                                                                                                 |
|                                                            |       TableScan: ./union_all_repro.parquet projection=[usage_system]                                                                                                        
2. Schema Recomputation

optimize_projections calls recompute_schema since the plan has changed. recompute_schema sees that the number of fields has changed and creates a new Union node with Union::try_new

Union::try_new calls Union::derive_schema_from_inputs to recreate the Union schema from the child inputs. For each field in the schema, it calls intersect_metadata_for_union which only keeps metadata when the field has the same metadata across all child inputs. Since the NULL literal doesn't have the same metadata as our column from parquet, the metadata gets removed from the logical schema.

3. Physical/Logical Schema Mismatch

During physical planning, we call DefaultPhysicalPlanner::map_logical_node_to_physical which checks the input of the Aggregate node (in this case Union) to see if its logical schema matches its physical schema. However, because we previously removed the metadata from Union, the metadata no longer matches when compared.

erratic-pattern avatar Dec 02 '25 16:12 erratic-pattern

nice find

alamb avatar Dec 02 '25 19:12 alamb

With the error message changes in https://github.com/apache/datafusion/pull/19070 applied, the reproducer described in this issue now gives a more helpful error message:

Internal error: Physical input schema should be the same as the one converted from logical input schema. Differences:
        - field metadata at index 0 [usage_idle]: (physical) {"iox::column::type": "iox::column_type::field::float"} vs (logical) {}
        - field metadata at index 1 [usage_system]: (physical) {"iox::column::type": "iox::column_type::field::float"} vs (logical) {}.
This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker

erratic-pattern avatar Dec 03 '25 16:12 erratic-pattern

@erratic-pattern I can't assign this to you can you do so

helenosheaa avatar Dec 08 '25 15:12 helenosheaa

@erratic-pattern I can't assign this to you can you do so

I also cannot. Perhaps @alamb can?

erratic-pattern avatar Dec 08 '25 16:12 erratic-pattern

@erratic-pattern I can't assign this to you can you do so

I also cannot. Perhaps @alamb can?

Done!

(you can also assign yourself tickets by issuing a comment take: https://datafusion.apache.org/contributor-guide/index.html#open-contribution-and-assigning-tickets

alamb avatar Dec 09 '25 22:12 alamb

It's not clear to me whether or not this is an actual bug. It seems reasonable to expect metadata to be consistent for field names across union branches. However it could also be problematic for queries that either:

  1. Shadow existing column names with new, unrelated columns. Variable shadowing in Rust is very normal, but I'm not sure if this is considered reasonable in SQL queries.
  2. Queries that introduce constant literals for fields on one side of a union, such as this reproducer.

Perhaps we need to adjust intersect_metadata_for_union to either:

  1. Avoid intersecting a branch that contains empty metadata, and instead preserve/intersect only the branches that contain non-empty metadata. This avoids destructive loss of metadata when one union branch is empty.
  2. Avoid intersecting a branch that contains empty metadata on a field that is a constant literal. This is a more restrictive version of option 1 that might result in fewer unintended consequences.
  3. union the metadata instead of intersecting the metadata. This ensures there is no metadata lost, but I am not sure what consequences this might have since it could populate metadata in the output schema when it was not intended to be there.

I am curious if anyone has opinions about any of these approaches, or if there is another way to look at this.

erratic-pattern avatar Dec 11 '25 17:12 erratic-pattern

It's not clear to me whether or not this is an actual bug. It seems reasonable to expect metadata to be consistent for field names across union branches. However it could also be problematic for queries that either:

In my mind it is a bug because the query is very reasonable -- I think there are other types of plans in queries where the metadata from different inputs needs to be combined (and thus might not be the same at the input and output -- for example joins and aggregates)

It seems like there is an implicit assumption that "the schema (including metadata) of a plan should remain the same after an optimizer pass". If this is indeed correct, then by your analysis above

optimize_projections calls recompute_schema since the plan has changed. recompute_schema sees that the number of fields has changed and creates a new Union node with Union::try_new

It seems like we should fix optimize_projections so it maintains the schema (either by attaching metadata to the NULL literal, or perhaps by simply reusing the previous schema).

For example, @adriangb just aded code that does something similar (though during execution) in this PR https://github.com/apache/datafusion/pull/19111 :

https://github.com/apache/datafusion/blob/bde16083ad344b7a52db5cb298a15d7434ffde51/datafusion/datasource-parquet/src/opener.rs#L529-L545

alamb avatar Dec 11 '25 18:12 alamb