datafusion-ballista icon indicating copy to clipboard operation
datafusion-ballista copied to clipboard

Ballista assumes all aggregate expressions are not DISTINCT

Open andygrove opened this issue 3 years ago • 5 comments

Describe the bug We have a hard-coded distinct = false parameter in ballista/rust/core/src/serde/physical_plan/mod.rs.

Ok(create_aggregate_expr(
    &aggr_function.into(),
    false, // <-- hard-coded "distinct"
    input_phy_expr.as_slice(),
    &physical_schema,
    name.to_string(),
)?)

To Reproduce Try running a COUNT(DISTINCT expr) in Ballista

Expected behavior We need to include the distinct flag in the protobuf for aggregate queries and implement the appropriate serde code.

Additional context None

andygrove avatar May 10 '22 20:05 andygrove

Hi @andygrove Ive run in local ballista

SELECT count(distinct c1) as cnt_distinct FROM aggregate_test_100

And the result is expected

+--------------+
| cnt_distinct |
+--------------+
| 5            |
+--------------+

which is expected

comphead avatar May 12 '22 13:05 comphead

I checked the backtrace

   2: datafusion_physical_expr::aggregate::build_in::create_aggregate_expr
             at ./datafusion/physical-expr/src/aggregate/build_in.rs:75:13
   3: datafusion::physical_plan::planner::create_aggregate_expr_with_name
             at ./datafusion/core/src/physical_plan/planner.rs:1347:13
   4: datafusion::physical_plan::planner::create_aggregate_expr
             at ./datafusion/core/src/physical_plan/planner.rs:1390:5
   5: datafusion::physical_plan::planner::DefaultPhysicalPlanner::create_initial_plan::{{closure}}::{{closure}}
             at ./datafusion/core/src/physical_plan/planner.rs:525:29

sounds weird, but I didn't notice ballista modules here.

comphead avatar May 12 '22 13:05 comphead

Related: https://github.com/apache/arrow-datafusion/pull/3250

andygrove avatar Aug 24 '22 17:08 andygrove

Hi @andygrove Ive run in local ballista

The issue is specific to distributed mode because it is the serde that has the hard-coded value

andygrove avatar Aug 24 '22 17:08 andygrove

Hi @andygrove Ive run in local ballista

The issue is specific to distributed mode because it is the serde that has the hard-coded value

Is there any doc how to run ballista tests in distributed mode? perhaps its part of CI now?

comphead avatar Aug 24 '22 20:08 comphead

@andygrove @comphead I tried to analyze the problem and found that SELECT count(distinct c1) as cnt_distinct FROM aggregate_test_100 is also expected in distributed mode.

Because single distinct is optimized by the optimizer as group by in datafusion:

example sql: select count(distinct c_name) from customer_1;

// Logic plan before optimization: 
Projection: COUNT(DISTINCT customer_1.c_name)                                                                                                                           
   Aggregate: groupBy=[[]], aggr=[[COUNT(DISTINCT customer_1.c_name)]]                                                                                                   
     TableScan: customer_1 projection=[c_name]

// Logic plan after optimization:
Projection: COUNT(DISTINCT customer_1.c_name)                                                                                                                                     
   Projection: COUNT(alias1) AS COUNT(DISTINCT customer_1.c_name)                                                                                                                  
     Aggregate: groupBy=[[]], aggr=[[COUNT(alias1)]]                                                                                                                               
       Aggregate: groupBy=[[customer_1.c_name AS alias1]], aggr=[[]]                                                                                                               
         TableScan: customer_1 projection=[c_name]

The current problem with ballista is that it does not support DistinctCount in non-single distinct scenarios. Example sql select count(distinct c_name), max(c_name) from customer_1:

[2022-10-18T06:32:14Z ERROR ballista_core::execution_plans::distributed_query] Job 3N8dtpp failed: Error planning job 3N8dtpp: NotImplemented("Aggregate function not supported: DistinctCount { name: \"COUNT(DISTINCT customer_1.c_name)\", data_type: Int64, state_data_types: [Utf8], exprs: [Column { name: \"c_name\", index: 0 }] }")
DataFusionError(ArrowError(ExternalError(Execution("Job 3N8dtpp failed: Error planning job 3N8dtpp: NotImplemented(\"Aggregate function not supported: DistinctCount { name: \\\"COUNT(DISTINCT customer_1.c_name)\\\", data_type: Int64, state_data_types: [Utf8], exprs: [Column { name: \\\"c_name\\\", index: 0 }] }\")"))))

I will sumit a PR for this :)

r4ntix avatar Oct 18 '22 07:10 r4ntix