datafusion-comet icon indicating copy to clipboard operation
datafusion-comet copied to clipboard

Standardize how we determine if operators are enabled and supported

Open andygrove opened this issue 1 month ago • 0 comments

What is the problem the feature request solves?

We currently have duplicate/overlapping logic in CometExecRule and QueryPlanSerde for determining whether operators are enabled or nor and whether we need to fall back to Spark.

For example, for ProjectExec, CometExecRule has no validation and simply delegates to QueryPlanSerde via newPlanWithProto:

      case op: ProjectExec =>
        newPlanWithProto(
          op,
          CometProjectExec(_, op, op.output, op.projectList, op.child, SerializedPlan(None)))

In some cases the checks happen first in CometExecRule and are then applied again in QueryPlanSerde:

      case op: SortMergeJoinExec
          if CometConf.COMET_EXEC_SORT_MERGE_JOIN_ENABLED.get(conf) &&
            op.children.forall(isCometNative) =>
        newPlanWithProto(
          op,
          CometSortMergeJoinExec(
            _,
            op,
            op.output,
            op.outputOrdering,
            op.leftKeys,
            op.rightKeys,
            op.joinType,
            op.condition,
            op.left,
            op.right,
            SerializedPlan(None)))

      case op: SortMergeJoinExec
          if CometConf.COMET_EXEC_SORT_MERGE_JOIN_ENABLED.get(conf) &&
            !op.children.forall(isCometNative) =>
        op

      case op: SortMergeJoinExec if !CometConf.COMET_EXEC_SORT_MERGE_JOIN_ENABLED.get(conf) =>
        withInfo(op, "SortMergeJoin is not enabled")

In other cases, such as sinks, all of the checks happen in CometExecRule and not in QueryPlanSerde

      case u: UnionExec
          if CometConf.COMET_EXEC_UNION_ENABLED.get(conf) &&
            u.children.forall(isCometNative) =>
        newPlanWithProto(
          u, {
            val cometOp = CometUnionExec(u, u.output, u.children)
            CometSinkPlaceHolder(_, u, cometOp)
          })

      case u: UnionExec if !CometConf.COMET_EXEC_UNION_ENABLED.get(conf) =>
        withInfo(u, "Union is not enabled")

Describe the potential solution

No response

Additional context

No response

andygrove avatar Nov 10 '25 18:11 andygrove