datafusion icon indicating copy to clipboard operation
datafusion copied to clipboard

optimize/simplify the literal data type and remove unnecessary cast、try_cast

Open liukun4515 opened this issue 2 years ago • 13 comments

Is your feature request related to a problem or challenge? Please describe what you are trying to do. Schema:

c1  decimal(18,)

select * from table where c1 = 20; select * from table where c1 in (12,2)

We will get the sql plan like below

    .....
       ....
         cast(c1 as decimal(20,0), cast(20 as decimal(20,0)

The type of 20 or 12,2 is INT64 in the datafusion, the coerced data type of decimal(18,0) compare with int64 is decimal(20,0) according to the rule get_comparison_common_decimal_type.

We need to optimize this point like spark-issue, and it will reduce unnecessary cast/try_cast in many literal case.

Describe the solution you'd like A clear and concise description of what you want to happen.

Describe alternatives you've considered A clear and concise description of any alternative solutions or features you've considered.

Additional context There is the plan to do the rule:

  • [x] add rule framework just with the binary expr and a simple example: select * from table where c1(int32) > 16 the 16 will be casted to int32 https://github.com/apache/arrow-datafusion/pull/3185
  • [x] support decimal data type in PreCastLitInBinaryComparisonExpressions rule https://github.com/apache/arrow-datafusion/pull/3245
  • [x] support decimal data type with other numeric type in PreCastLitInBinaryComparisonExpressions rule https://github.com/apache/arrow-datafusion/pull/3245
  • [x] support inlist or inset in PreCastLitInBinaryComparisonExpressions rule https://github.com/apache/arrow-datafusion/pull/3270
  • [ ] support other expr in PreCastLitInBinaryComparisonExpressions rule
  • [x] https://github.com/apache/arrow-datafusion/issues/3259

liukun4515 avatar Aug 04 '22 02:08 liukun4515

Optimize the decimal value from arrow-rs https://github.com/apache/arrow-rs/issues/2313

liukun4515 avatar Aug 04 '22 03:08 liukun4515

Here is a self contained reproducer for anyone following along:

❯ create table foo as select column1 as d from (values (1), (2));
+---+
| d |
+---+
| 1 |
| 2 |
+---+
2 rows in set. Query took 0.005 seconds.
❯ create table bar as select cast (d as decimal) as d from foo;
+--------------+
| d            |
+--------------+
| 1.0000000000 |
| 2.0000000000 |
+--------------+
2 rows in set. Query took 0.005 seconds.
❯ explain select * from bar where d = 1.4;
+---------------+-----------------------------------------------------------------------------------+
| plan_type     | plan                                                                              |
+---------------+-----------------------------------------------------------------------------------+
| logical_plan  | Projection: #bar.d                                                                |
|               |   Filter: #bar.d = Float64(1.4)                                                   |
|               |     TableScan: bar projection=[d]                                                 |
| physical_plan | ProjectionExec: expr=[d@0 as d]                                                   |
|               |   CoalesceBatchesExec: target_batch_size=4096                                     |
|               |     FilterExec: CAST(d@0 AS Decimal128(38, 15)) = CAST(1.4 AS Decimal128(38, 15)) |
|               |       RepartitionExec: partitioning=RoundRobinBatch(16)                           |
|               |         MemoryExec: partitions=1, partition_sizes=[1]                             |
|               |                                                                                   |
+---------------+-----------------------------------------------------------------------------------+
2 rows in set. Query took 0.005 seconds.

The FilterExec line above should not have the CAST operations in them

alamb avatar Aug 17 '22 09:08 alamb

And we can see that as @liukun4515 says the casting is added in the physical plan (rather than the logical plan):


❯ explain verbose select * from bar where d = 1.4;
+-------------------------------------------------------+-----------------------------------------------------------------------------------+
| plan_type                                             | plan                                                                              |
+-------------------------------------------------------+-----------------------------------------------------------------------------------+
| initial_logical_plan                                  | Projection: #bar.d                                                                |
|                                                       |   Filter: #bar.d = Float64(1.4)                                                   |
|                                                       |     TableScan: bar                                                                |
| logical_plan after simplify_expressions               | SAME TEXT AS ABOVE                                                                |
| logical_plan after decorrelate_where_exists           | SAME TEXT AS ABOVE                                                                |
| logical_plan after decorrelate_where_in               | SAME TEXT AS ABOVE                                                                |
| logical_plan after decorrelate_scalar_subquery        | SAME TEXT AS ABOVE                                                                |
| logical_plan after subquery_filter_to_join            | SAME TEXT AS ABOVE                                                                |
| logical_plan after eliminate_filter                   | SAME TEXT AS ABOVE                                                                |
| logical_plan after common_sub_expression_eliminate    | SAME TEXT AS ABOVE                                                                |
| logical_plan after eliminate_limit                    | SAME TEXT AS ABOVE                                                                |
| logical_plan after projection_push_down               | Projection: #bar.d                                                                |
|                                                       |   Filter: #bar.d = Float64(1.4)                                                   |
|                                                       |     TableScan: bar projection=[d]                                                 |
| logical_plan after rewrite_disjunctive_predicate      | SAME TEXT AS ABOVE                                                                |
| logical_plan after reduce_outer_join                  | SAME TEXT AS ABOVE                                                                |
| logical_plan after filter_push_down                   | SAME TEXT AS ABOVE                                                                |
| logical_plan after limit_push_down                    | SAME TEXT AS ABOVE                                                                |
| logical_plan after SingleDistinctAggregationToGroupBy | SAME TEXT AS ABOVE                                                                |
| logical_plan                                          | Projection: #bar.d                                                                |
|                                                       |   Filter: #bar.d = Float64(1.4)                                                   |
|                                                       |     TableScan: bar projection=[d]                                                 |
| initial_physical_plan                                 | ProjectionExec: expr=[d@0 as d]                                                   |
|                                                       |   FilterExec: CAST(d@0 AS Decimal128(38, 15)) = CAST(1.4 AS Decimal128(38, 15))   |
|                                                       |     MemoryExec: partitions=1, partition_sizes=[1]                                 |
|                                                       |                                                                                   |
| physical_plan after aggregate_statistics              | SAME TEXT AS ABOVE                                                                |
| physical_plan after hash_build_probe_order            | SAME TEXT AS ABOVE                                                                |
| physical_plan after coalesce_batches                  | ProjectionExec: expr=[d@0 as d]                                                   |
|                                                       |   CoalesceBatchesExec: target_batch_size=4096                                     |
|                                                       |     FilterExec: CAST(d@0 AS Decimal128(38, 15)) = CAST(1.4 AS Decimal128(38, 15)) |
|                                                       |       MemoryExec: partitions=1, partition_sizes=[1]                               |
|                                                       |                                                                                   |
| physical_plan after repartition                       | ProjectionExec: expr=[d@0 as d]                                                   |
|                                                       |   CoalesceBatchesExec: target_batch_size=4096                                     |
|                                                       |     FilterExec: CAST(d@0 AS Decimal128(38, 15)) = CAST(1.4 AS Decimal128(38, 15)) |
|                                                       |       RepartitionExec: partitioning=RoundRobinBatch(16)                           |
|                                                       |         MemoryExec: partitions=1, partition_sizes=[1]                             |
|                                                       |                                                                                   |
| physical_plan after add_merge_exec                    | SAME TEXT AS ABOVE                                                                |
| physical_plan                                         | ProjectionExec: expr=[d@0 as d]                                                   |
|                                                       |   CoalesceBatchesExec: target_batch_size=4096                                     |
|                                                       |     FilterExec: CAST(d@0 AS Decimal128(38, 15)) = CAST(1.4 AS Decimal128(38, 15)) |
|                                                       |       RepartitionExec: partitioning=RoundRobinBatch(16)                           |
|                                                       |         MemoryExec: partitions=1, partition_sizes=[1]                             |
|                                                       |                                                                                   |
+-------------------------------------------------------+-----------------------------------------------------------------------------------+

alamb avatar Aug 17 '22 09:08 alamb

I agree the ideal outcome would be that the logical plan has the casts. I think the reason the type coercion / casting happens in the physical planner was some idea that it could be possible to use different implementations of physical exprs that might have different comparison rules (like some system that could natively compare i32 and i64 -- via a native kernel, for example)

I am not sure if this ability has every been used in practice but I am not sure it would be a simple thing to change now

I suggest you investigate special casing the creation of PhysicalExprs for for cast https://github.com/apache/arrow-datafusion/blob/89bcfc4827a84c37abdf6476ec164611b270492d/datafusion/physical-expr/src/planner.rs#L172-L181

And basically special case if the argument is a literal, do the cast at plan creation time. It wouldn't be super be super general, but I think it would work in this case for you

alamb avatar Aug 17 '22 10:08 alamb

Another alternative might be to follow the model of ConstEvaluator(which operates onExpr`s) any apply it to physical expressions 🤔 https://github.com/apache/arrow-datafusion/blob/3eb55e9a0510d872f6f7765b1a5f17db46486e45/datafusion/optimizer/src/simplify_expressions.rs#L397-L415

You would likely need changes to PhysicalExpr similar to what @iajoiner and i were discussing on https://github.com/apache/arrow-datafusion/issues/3014#issuecomment-1204494306

alamb avatar Aug 17 '22 10:08 alamb

Here is a self contained reproducer for anyone following along:

❯ create table foo as select column1 as d from (values (1), (2));
+---+
| d |
+---+
| 1 |
| 2 |
+---+
2 rows in set. Query took 0.005 seconds.
❯ create table bar as select cast (d as decimal) as d from foo;
+--------------+
| d            |
+--------------+
| 1.0000000000 |
| 2.0000000000 |
+--------------+
2 rows in set. Query took 0.005 seconds.
❯ explain select * from bar where d = 1.4;
+---------------+-----------------------------------------------------------------------------------+
| plan_type     | plan                                                                              |
+---------------+-----------------------------------------------------------------------------------+
| logical_plan  | Projection: #bar.d                                                                |
|               |   Filter: #bar.d = Float64(1.4)                                                   |
|               |     TableScan: bar projection=[d]                                                 |
| physical_plan | ProjectionExec: expr=[d@0 as d]                                                   |
|               |   CoalesceBatchesExec: target_batch_size=4096                                     |
|               |     FilterExec: CAST(d@0 AS Decimal128(38, 15)) = CAST(1.4 AS Decimal128(38, 15)) |
|               |       RepartitionExec: partitioning=RoundRobinBatch(16)                           |
|               |         MemoryExec: partitions=1, partition_sizes=[1]                             |
|               |                                                                                   |
+---------------+-----------------------------------------------------------------------------------+
2 rows in set. Query took 0.005 seconds.

The FilterExec line above should not have the CAST operations in them

The cast is added in the creation of physical expr/physical plan. It follow a generate rule for coerced binary comparison. Like below:

INT32 < INT64 -> INT64

DECIMAL(10,2) < DOUBLE -> Other decimal data type.

I think it all in the comparison_binary_numeric_coercion function.

This is just the general rule, and it works well in all cases. But in many user case, we just use the literal as filter expr and other condition as this issue, the new optimizer rule should resolve this case like in the spark.

I have file a draft pr which add a logical optimizer rule to do this, but it maybe ready tomorrow because of some changes of plan needed to reviewed by myself first. I think the rule can works well for us. https://github.com/apache/arrow-datafusion/pull/3185

@alamb

liukun4515 avatar Aug 17 '22 12:08 liukun4515

The cast is added in the creation of physical expr/physical plan.

Yes I agree

It sounds like you are proposing moving the coercion to the logical planning phase -- while I am not opposed to doing so I do think it is likely a large change which I think we'll want to run by other maintainers. But getting a PR up to do the change is the first step

I do think it is worth considering special casing just the casting in the physical planner (as the physical planner is what is adding the casts in the first place)

alamb avatar Aug 17 '22 14:08 alamb

+1 for optimization rules to perform type coercion in the logical plan. I am running into some issues around this myself and am seeing quite a lot of inconsistencies in how we currently handle this.

andygrove avatar Aug 17 '22 16:08 andygrove

+1 for optimization rules to perform type coercion in the logical plan. I am running into some issues around this myself and am seeing quite a lot of inconsistencies in how we currently handle this.

Do you mean that you wish to migrate the type coercion from physical plan creation to logical plan creation?

But this issue just add a rule to optimize the expr and improve the performance, and will not do and changes in the generation of physical plan or physical expr.

liukun4515 avatar Aug 18 '22 01:08 liukun4515

The cast is added in the creation of physical expr/physical plan.

Yes I agree

It sounds like you are proposing moving the coercion to the logical planning phase -- while I am not opposed to doing so I do think it is likely a large change which I think we'll want to run by other maintainers. But getting a PR up to do the change is the first step

In my knowledge from other database system, the generation for physical expr will not do any changes for original expr and just do transformation from logical expr to physical expr.

I remember we discussed this a long time ago.

But this issue is not used to do this, and just to optimize the case described in the beginning of this issue.

I do think it is worth considering special casing just the casting in the physical planner (as the physical planner is what is adding the casts in the first place)

We can pull a new issue to discussion why and how to migrate the type coercion from physical parse to logical parse. What benefit we can get from the work.

@alamb

liukun4515 avatar Aug 18 '22 02:08 liukun4515

We can pull a new issue to discussion why and how to migrate the type coercion from physical parse to logical parse. What benefit we can get from the work.

Makes sense to me 👍 thank you @liukun4515

To be clear I think moving coercion to logical planning is a good idea, but I think it will be a non trivial change that will take some time. Thus, I was suggesting the "special case physical expr planning" idea as a way to speed up casting in the short term

alamb avatar Aug 18 '22 18:08 alamb

@liukun4515 I wonder if this ticket is now complete?

alamb avatar Sep 19 '22 11:09 alamb

@liukun4515 I wonder if this ticket is now complete?

There are some follow-up optimization tasks. I will list them and then close this pr.

liukun4515 avatar Sep 20 '22 11:09 liukun4515

ping

alamb avatar Dec 14 '22 20:12 alamb

we can close it after merged https://github.com/apache/arrow-datafusion/pull/4634

liukun4515 avatar Dec 15 '22 03:12 liukun4515