datafusion
datafusion copied to clipboard
optimize/simplify the literal data type and remove unnecessary cast、try_cast
Is your feature request related to a problem or challenge? Please describe what you are trying to do. Schema:
c1 decimal(18,)
select * from table where c1 = 20; select * from table where c1 in (12,2)
We will get the sql plan like below
.....
....
cast(c1 as decimal(20,0), cast(20 as decimal(20,0)
The type of 20
or 12,2
is INT64 in the datafusion, the coerced data type of decimal(18,0) compare with int64
is decimal(20,0) according to the rule get_comparison_common_decimal_type
.
We need to optimize this point like spark-issue, and it will reduce unnecessary cast/try_cast in many literal case.
Describe the solution you'd like A clear and concise description of what you want to happen.
Describe alternatives you've considered A clear and concise description of any alternative solutions or features you've considered.
Additional context There is the plan to do the rule:
- [x] add rule framework just with the binary expr and a simple example:
select * from table where c1(int32) > 16
the 16 will be casted to int32 https://github.com/apache/arrow-datafusion/pull/3185 - [x] support decimal data type in PreCastLitInBinaryComparisonExpressions rule https://github.com/apache/arrow-datafusion/pull/3245
- [x] support decimal data type with other numeric type in PreCastLitInBinaryComparisonExpressions rule https://github.com/apache/arrow-datafusion/pull/3245
- [x] support inlist or inset in PreCastLitInBinaryComparisonExpressions rule https://github.com/apache/arrow-datafusion/pull/3270
- [ ] support other expr in PreCastLitInBinaryComparisonExpressions rule
- [x] https://github.com/apache/arrow-datafusion/issues/3259
Optimize the decimal value from arrow-rs https://github.com/apache/arrow-rs/issues/2313
Here is a self contained reproducer for anyone following along:
❯ create table foo as select column1 as d from (values (1), (2));
+---+
| d |
+---+
| 1 |
| 2 |
+---+
2 rows in set. Query took 0.005 seconds.
❯ create table bar as select cast (d as decimal) as d from foo;
+--------------+
| d |
+--------------+
| 1.0000000000 |
| 2.0000000000 |
+--------------+
2 rows in set. Query took 0.005 seconds.
❯ explain select * from bar where d = 1.4;
+---------------+-----------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------+
| logical_plan | Projection: #bar.d |
| | Filter: #bar.d = Float64(1.4) |
| | TableScan: bar projection=[d] |
| physical_plan | ProjectionExec: expr=[d@0 as d] |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | FilterExec: CAST(d@0 AS Decimal128(38, 15)) = CAST(1.4 AS Decimal128(38, 15)) |
| | RepartitionExec: partitioning=RoundRobinBatch(16) |
| | MemoryExec: partitions=1, partition_sizes=[1] |
| | |
+---------------+-----------------------------------------------------------------------------------+
2 rows in set. Query took 0.005 seconds.
The FilterExec line above should not have the CAST operations in them
And we can see that as @liukun4515 says the casting is added in the physical plan (rather than the logical plan):
❯ explain verbose select * from bar where d = 1.4;
+-------------------------------------------------------+-----------------------------------------------------------------------------------+
| plan_type | plan |
+-------------------------------------------------------+-----------------------------------------------------------------------------------+
| initial_logical_plan | Projection: #bar.d |
| | Filter: #bar.d = Float64(1.4) |
| | TableScan: bar |
| logical_plan after simplify_expressions | SAME TEXT AS ABOVE |
| logical_plan after decorrelate_where_exists | SAME TEXT AS ABOVE |
| logical_plan after decorrelate_where_in | SAME TEXT AS ABOVE |
| logical_plan after decorrelate_scalar_subquery | SAME TEXT AS ABOVE |
| logical_plan after subquery_filter_to_join | SAME TEXT AS ABOVE |
| logical_plan after eliminate_filter | SAME TEXT AS ABOVE |
| logical_plan after common_sub_expression_eliminate | SAME TEXT AS ABOVE |
| logical_plan after eliminate_limit | SAME TEXT AS ABOVE |
| logical_plan after projection_push_down | Projection: #bar.d |
| | Filter: #bar.d = Float64(1.4) |
| | TableScan: bar projection=[d] |
| logical_plan after rewrite_disjunctive_predicate | SAME TEXT AS ABOVE |
| logical_plan after reduce_outer_join | SAME TEXT AS ABOVE |
| logical_plan after filter_push_down | SAME TEXT AS ABOVE |
| logical_plan after limit_push_down | SAME TEXT AS ABOVE |
| logical_plan after SingleDistinctAggregationToGroupBy | SAME TEXT AS ABOVE |
| logical_plan | Projection: #bar.d |
| | Filter: #bar.d = Float64(1.4) |
| | TableScan: bar projection=[d] |
| initial_physical_plan | ProjectionExec: expr=[d@0 as d] |
| | FilterExec: CAST(d@0 AS Decimal128(38, 15)) = CAST(1.4 AS Decimal128(38, 15)) |
| | MemoryExec: partitions=1, partition_sizes=[1] |
| | |
| physical_plan after aggregate_statistics | SAME TEXT AS ABOVE |
| physical_plan after hash_build_probe_order | SAME TEXT AS ABOVE |
| physical_plan after coalesce_batches | ProjectionExec: expr=[d@0 as d] |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | FilterExec: CAST(d@0 AS Decimal128(38, 15)) = CAST(1.4 AS Decimal128(38, 15)) |
| | MemoryExec: partitions=1, partition_sizes=[1] |
| | |
| physical_plan after repartition | ProjectionExec: expr=[d@0 as d] |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | FilterExec: CAST(d@0 AS Decimal128(38, 15)) = CAST(1.4 AS Decimal128(38, 15)) |
| | RepartitionExec: partitioning=RoundRobinBatch(16) |
| | MemoryExec: partitions=1, partition_sizes=[1] |
| | |
| physical_plan after add_merge_exec | SAME TEXT AS ABOVE |
| physical_plan | ProjectionExec: expr=[d@0 as d] |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | FilterExec: CAST(d@0 AS Decimal128(38, 15)) = CAST(1.4 AS Decimal128(38, 15)) |
| | RepartitionExec: partitioning=RoundRobinBatch(16) |
| | MemoryExec: partitions=1, partition_sizes=[1] |
| | |
+-------------------------------------------------------+-----------------------------------------------------------------------------------+
I agree the ideal outcome would be that the logical plan has the casts. I think the reason the type coercion / casting happens in the physical planner was some idea that it could be possible to use different implementations of physical exprs that might have different comparison rules (like some system that could natively compare i32 and i64 -- via a native kernel, for example)
I am not sure if this ability has every been used in practice but I am not sure it would be a simple thing to change now
I suggest you investigate special casing the creation of PhysicalExpr
s for for cast https://github.com/apache/arrow-datafusion/blob/89bcfc4827a84c37abdf6476ec164611b270492d/datafusion/physical-expr/src/planner.rs#L172-L181
And basically special case if the argument is a literal, do the cast at plan creation time. It wouldn't be super be super general, but I think it would work in this case for you
Another alternative might be to follow the model of ConstEvaluator(which operates on
Expr`s) any apply it to physical expressions 🤔
https://github.com/apache/arrow-datafusion/blob/3eb55e9a0510d872f6f7765b1a5f17db46486e45/datafusion/optimizer/src/simplify_expressions.rs#L397-L415
You would likely need changes to PhysicalExpr similar to what @iajoiner and i were discussing on https://github.com/apache/arrow-datafusion/issues/3014#issuecomment-1204494306
Here is a self contained reproducer for anyone following along:
❯ create table foo as select column1 as d from (values (1), (2)); +---+ | d | +---+ | 1 | | 2 | +---+ 2 rows in set. Query took 0.005 seconds. ❯ create table bar as select cast (d as decimal) as d from foo; +--------------+ | d | +--------------+ | 1.0000000000 | | 2.0000000000 | +--------------+ 2 rows in set. Query took 0.005 seconds. ❯ explain select * from bar where d = 1.4; +---------------+-----------------------------------------------------------------------------------+ | plan_type | plan | +---------------+-----------------------------------------------------------------------------------+ | logical_plan | Projection: #bar.d | | | Filter: #bar.d = Float64(1.4) | | | TableScan: bar projection=[d] | | physical_plan | ProjectionExec: expr=[d@0 as d] | | | CoalesceBatchesExec: target_batch_size=4096 | | | FilterExec: CAST(d@0 AS Decimal128(38, 15)) = CAST(1.4 AS Decimal128(38, 15)) | | | RepartitionExec: partitioning=RoundRobinBatch(16) | | | MemoryExec: partitions=1, partition_sizes=[1] | | | | +---------------+-----------------------------------------------------------------------------------+ 2 rows in set. Query took 0.005 seconds.
The FilterExec line above should not have the CAST operations in them
The cast
is added in the creation of physical expr/physical plan.
It follow a generate rule for coerced binary comparison.
Like below:
INT32 < INT64 -> INT64
DECIMAL(10,2) < DOUBLE -> Other decimal data type.
I think it all in the comparison_binary_numeric_coercion
function.
This is just the general rule, and it works well in all cases. But in many user case, we just use the literal as filter expr and other condition as this issue, the new optimizer rule should resolve this case like in the spark.
I have file a draft pr which add a logical optimizer rule to do this, but it maybe ready tomorrow because of some changes of plan needed to reviewed by myself first. I think the rule can works well for us. https://github.com/apache/arrow-datafusion/pull/3185
@alamb
The cast is added in the creation of physical expr/physical plan.
Yes I agree
It sounds like you are proposing moving the coercion to the logical planning phase -- while I am not opposed to doing so I do think it is likely a large change which I think we'll want to run by other maintainers. But getting a PR up to do the change is the first step
I do think it is worth considering special casing just the casting in the physical planner (as the physical planner is what is adding the casts in the first place)
+1 for optimization rules to perform type coercion in the logical plan. I am running into some issues around this myself and am seeing quite a lot of inconsistencies in how we currently handle this.
+1 for optimization rules to perform type coercion in the logical plan. I am running into some issues around this myself and am seeing quite a lot of inconsistencies in how we currently handle this.
Do you mean that you wish to migrate the type coercion from physical plan creation to logical plan creation?
But this issue just add a rule to optimize the expr and improve the performance, and will not do and changes in the generation of physical plan or physical expr.
The cast is added in the creation of physical expr/physical plan.
Yes I agree
It sounds like you are proposing moving the coercion to the logical planning phase -- while I am not opposed to doing so I do think it is likely a large change which I think we'll want to run by other maintainers. But getting a PR up to do the change is the first step
In my knowledge from other database system, the generation for physical expr will not do any changes for original expr and just do transformation from logical expr to physical expr.
I remember we discussed this a long time ago.
But this issue is not used to do this, and just to optimize the case described in the beginning of this issue.
I do think it is worth considering special casing just the casting in the physical planner (as the physical planner is what is adding the casts in the first place)
We can pull a new issue to discussion why and how to migrate the type coercion from physical parse to logical parse. What benefit we can get from the work.
@alamb
We can pull a new issue to discussion why and how to migrate the type coercion from physical parse to logical parse. What benefit we can get from the work.
Makes sense to me 👍 thank you @liukun4515
To be clear I think moving coercion to logical planning is a good idea, but I think it will be a non trivial change that will take some time. Thus, I was suggesting the "special case physical expr planning" idea as a way to speed up casting in the short term
@liukun4515 I wonder if this ticket is now complete?
@liukun4515 I wonder if this ticket is now complete?
There are some follow-up optimization tasks. I will list them and then close this pr.
ping
we can close it after merged https://github.com/apache/arrow-datafusion/pull/4634