Explore cost-based optimizations
What is the problem the feature request solves?
I recently worked on a few experimental PRs around CBO that I am going to close because they did not provide an immediate benefit, but I wanted to document the work so am using this issue for that.
Using DataFusion's Physical Optimizer
DataFusion provides a physical optimizer and there may be benefit in the future from applying DataFusion's rules or custom Comet rules. For example, injecting CopyExec into the plan would be ideal for an optimizer rule.
In jni_api.rs we would need to add the rules that we want to enable:
let state = SessionStateBuilder::new()
.with_config(session_config)
.with_runtime_env(Arc::new(runtime))
.with_default_features()
.with_physical_optimizer_rules(vec![Arc::new(TopKAggregation::new())])
.build();
Then in planner.rs we could add the call to optimize the plan:
pub fn optimize_plan(
&self,
plan: Arc<dyn ExecutionPlan>,
) -> Result<Arc<dyn ExecutionPlan>, ExecutionError> {
// optimize the physical plan
let datafusion_planner = DefaultPhysicalPlanner::default();
datafusion_planner
.optimize_physical_plan(plan, &self.session_ctx.state(), |_, _| {})
.map_err(|e| e.into())
}
Because we receive a plan that is already optimized by Spark, there is no immediate benefit in enabling the current rules from DataFusion.
Passing statistics down to the native plan
It is possible for use to pass Spark statistics down to the native plan. For example, we can add this QueryPlanSerde.scala:
op match {
case qs: QueryStageExec =>
qs.computeStats() match {
case Some(stats) =>
val statsBuilder = OperatorOuterClass.Statistics.newBuilder()
stats.rowCount.foreach(c => statsBuilder.setRowCount(c.toFloat))
statsBuilder.setSizeInBytes(stats.sizeInBytes.toFloat)
scanBuilder.setStatistics(statsBuilder.build())
case _ =>
}
case _ =>
}
It is also possible to get size in bytes from any Hadoop input relations and infer row count based on schema.
There is no value in doing this though until we have optimizer rules that can make use of this data.
Cost-model to determine when to fall back to Spark
We could implement a cost-model with the relative costs of Comet vs Spark operators and expressions and fall back to Spark in the case where we estimate that Comet would be more expensive.
Describe the potential solution
No response
Additional context
No response