Ballista assumes all aggregate expressions are not DISTINCT
Describe the bug
We have a hard-coded distinct = false parameter in ballista/rust/core/src/serde/physical_plan/mod.rs.
Ok(create_aggregate_expr(
&aggr_function.into(),
false, // <-- hard-coded "distinct"
input_phy_expr.as_slice(),
&physical_schema,
name.to_string(),
)?)
To Reproduce
Try running a COUNT(DISTINCT expr) in Ballista
Expected behavior We need to include the distinct flag in the protobuf for aggregate queries and implement the appropriate serde code.
Additional context None
Hi @andygrove Ive run in local ballista
SELECT count(distinct c1) as cnt_distinct FROM aggregate_test_100
And the result is expected
+--------------+
| cnt_distinct |
+--------------+
| 5 |
+--------------+
which is expected
I checked the backtrace
2: datafusion_physical_expr::aggregate::build_in::create_aggregate_expr
at ./datafusion/physical-expr/src/aggregate/build_in.rs:75:13
3: datafusion::physical_plan::planner::create_aggregate_expr_with_name
at ./datafusion/core/src/physical_plan/planner.rs:1347:13
4: datafusion::physical_plan::planner::create_aggregate_expr
at ./datafusion/core/src/physical_plan/planner.rs:1390:5
5: datafusion::physical_plan::planner::DefaultPhysicalPlanner::create_initial_plan::{{closure}}::{{closure}}
at ./datafusion/core/src/physical_plan/planner.rs:525:29
sounds weird, but I didn't notice ballista modules here.
Related: https://github.com/apache/arrow-datafusion/pull/3250
Hi @andygrove Ive run in local ballista
The issue is specific to distributed mode because it is the serde that has the hard-coded value
Hi @andygrove Ive run in local ballista
The issue is specific to distributed mode because it is the serde that has the hard-coded value
Is there any doc how to run ballista tests in distributed mode? perhaps its part of CI now?
@andygrove @comphead
I tried to analyze the problem and found that SELECT count(distinct c1) as cnt_distinct FROM aggregate_test_100 is also expected in distributed mode.
Because single distinct is optimized by the optimizer as group by in datafusion:
example sql: select count(distinct c_name) from customer_1;
// Logic plan before optimization:
Projection: COUNT(DISTINCT customer_1.c_name)
Aggregate: groupBy=[[]], aggr=[[COUNT(DISTINCT customer_1.c_name)]]
TableScan: customer_1 projection=[c_name]
// Logic plan after optimization:
Projection: COUNT(DISTINCT customer_1.c_name)
Projection: COUNT(alias1) AS COUNT(DISTINCT customer_1.c_name)
Aggregate: groupBy=[[]], aggr=[[COUNT(alias1)]]
Aggregate: groupBy=[[customer_1.c_name AS alias1]], aggr=[[]]
TableScan: customer_1 projection=[c_name]
The current problem with ballista is that it does not support DistinctCount in non-single distinct scenarios.
Example sql select count(distinct c_name), max(c_name) from customer_1:
[2022-10-18T06:32:14Z ERROR ballista_core::execution_plans::distributed_query] Job 3N8dtpp failed: Error planning job 3N8dtpp: NotImplemented("Aggregate function not supported: DistinctCount { name: \"COUNT(DISTINCT customer_1.c_name)\", data_type: Int64, state_data_types: [Utf8], exprs: [Column { name: \"c_name\", index: 0 }] }")
DataFusionError(ArrowError(ExternalError(Execution("Job 3N8dtpp failed: Error planning job 3N8dtpp: NotImplemented(\"Aggregate function not supported: DistinctCount { name: \\\"COUNT(DISTINCT customer_1.c_name)\\\", data_type: Int64, state_data_types: [Utf8], exprs: [Column { name: \\\"c_name\\\", index: 0 }] }\")"))))
I will sumit a PR for this :)