delta icon indicating copy to clipboard operation
delta copied to clipboard

[Feature Request] Add a salt column to DeltaOptimizeWriterExec

Open sezruby opened this issue 1 year ago • 2 comments

Feature request

Which Delta project/connector is this regarding?

  • [x] Spark
  • [ ] Standalone
  • [ ] Flink
  • [ ] Kernel
  • [ ] Other (fill in here)

Overview

Missing implementation of salted partitioning for OW, though the variable name is "salted": https://github.com/delta-io/delta/blob/4fefba182f81d39f1d11e2f2b85bfa140079ea11/spark/src/main/scala/org/apache/spark/sql/delta/perf/DeltaOptimizedWriterExec.scala#L88

Motivation

Without salt partitioning column, optimize write task can be skewed depending on child partitioning. If a child partition is heavily skewed and has the same partition key, it can't be rebalanced without salted column. It can cause a long running write task with skewed data.

Further details

We could add salted expr, next to partition columns:

      val saltedPartitioning = HashPartitioning(
        partitionColumns.map(p => output.find(o => resolver(p, o.name)).getOrElse(
          throw DeltaErrors.failedFindPartitionColumnInOutputPlan(p))) ++ saltExpr,
        numPartitions)
case class SaltColExpr(saltPartSize: Long) extends LeafExpression {

  // Transient variable reset to 0 when deserialization.
  @transient private[this] var saltVal: Long = 0
  @transient private[this] var buf: Long = 0

  override def nullable: Boolean = false

  override def dataType: DataType = LongType

  override def eval(input: InternalRow): Long = {
    buf -= 1
    if (buf <= 0) {
      saltVal += 1
      buf = saltPartSize
    }
    saltVal
  }

  override def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): SaltColExpr = {
    SaltColExpr(saltPartSize)
  }

  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
    val saltValTerm = ctx.addMutableState(CodeGenerator.JAVA_LONG, "saltVal")
    val bufTerm = ctx.addMutableState(CodeGenerator.JAVA_LONG, "buf")
    ctx.addPartitionInitializationStatement(s"$saltValTerm = 0L;")
    ctx.addPartitionInitializationStatement(s"$bufTerm = 0L;")

    ev.copy(code = code"""
      $bufTerm -= 1;
      if ($bufTerm <= 0L) {
        $bufTerm = ${saltPartSize}L;
        $saltValTerm ++;
      }
      final ${CodeGenerator.javaType(dataType)} ${ev.value} = $saltValTerm;
      """, isNull = FalseLiteral)
  }

  override def nodeName: String = "salt_part_expr"

  override def sql: String = s"$prettyName()"
}

The saltPartSize is row count unit to change the salt column value. For example, with saltPartSize=100, 1 for first 1-100 row index, 2 for 101-200 row index, within a child partition. As we don't have average/estimated row size in bytes or (file size in bytes / row count) statistics, we can't decide it adaptively. It would be good to support adaptive threshold using binSize and avg row size. We can read fileSize/recordCount from one AddFile entry, however it needs to build a snapshot which is unnecessary for write job, or read from lastcheckpointfile like protocol read. Without utilizing row size estimation, the implementation will remain as a manual mitigation way, when needed.

It would be great if Databricks can open existing implementation, if possible.

Willingness to contribute

The Delta Lake Community encourages new feature contributions. Would you or another member of your organization be willing to contribute an implementation of this feature?

  • [ ] Yes. I can contribute this feature independently.
  • [x] Yes. I would be willing to contribute this feature with guidance from the Delta Lake community.
  • [ ] No. I cannot contribute this feature at this time.

sezruby avatar Jul 25 '24 23:07 sezruby

The required change is not large for the manual mitigation - without statistics - fixed salt part size. I can open a PR for this.

In addition, I wonder why we skip single partition: https://github.com/delta-io/delta/blob/4fefba182f81d39f1d11e2f2b85bfa140079ea11/spark/src/main/scala/org/apache/spark/sql/delta/perf/DeltaOptimizedWriterExec.scala#L171

If it is one large partition, OW can't split it

sezruby avatar Jul 25 '24 23:07 sezruby

@weiluo-db, @rahulsmahadev any thoughts?

felipepessoto avatar Aug 22 '24 23:08 felipepessoto