datafusion-comet
datafusion-comet copied to clipboard
Standardize how we determine if operators are enabled and supported
What is the problem the feature request solves?
We currently have duplicate/overlapping logic in CometExecRule and QueryPlanSerde for determining whether operators are enabled or nor and whether we need to fall back to Spark.
For example, for ProjectExec, CometExecRule has no validation and simply delegates to QueryPlanSerde via newPlanWithProto:
case op: ProjectExec =>
newPlanWithProto(
op,
CometProjectExec(_, op, op.output, op.projectList, op.child, SerializedPlan(None)))
In some cases the checks happen first in CometExecRule and are then applied again in QueryPlanSerde:
case op: SortMergeJoinExec
if CometConf.COMET_EXEC_SORT_MERGE_JOIN_ENABLED.get(conf) &&
op.children.forall(isCometNative) =>
newPlanWithProto(
op,
CometSortMergeJoinExec(
_,
op,
op.output,
op.outputOrdering,
op.leftKeys,
op.rightKeys,
op.joinType,
op.condition,
op.left,
op.right,
SerializedPlan(None)))
case op: SortMergeJoinExec
if CometConf.COMET_EXEC_SORT_MERGE_JOIN_ENABLED.get(conf) &&
!op.children.forall(isCometNative) =>
op
case op: SortMergeJoinExec if !CometConf.COMET_EXEC_SORT_MERGE_JOIN_ENABLED.get(conf) =>
withInfo(op, "SortMergeJoin is not enabled")
In other cases, such as sinks, all of the checks happen in CometExecRule and not in QueryPlanSerde
case u: UnionExec
if CometConf.COMET_EXEC_UNION_ENABLED.get(conf) &&
u.children.forall(isCometNative) =>
newPlanWithProto(
u, {
val cometOp = CometUnionExec(u, u.output, u.children)
CometSinkPlaceHolder(_, u, cometOp)
})
case u: UnionExec if !CometConf.COMET_EXEC_UNION_ENABLED.get(conf) =>
withInfo(u, "Union is not enabled")
Describe the potential solution
No response
Additional context
No response