[Feature Request] Add a salt column to DeltaOptimizeWriterExec
Feature request
Which Delta project/connector is this regarding?
- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)
Overview
Missing implementation of salted partitioning for OW, though the variable name is "salted": https://github.com/delta-io/delta/blob/4fefba182f81d39f1d11e2f2b85bfa140079ea11/spark/src/main/scala/org/apache/spark/sql/delta/perf/DeltaOptimizedWriterExec.scala#L88
Motivation
Without salt partitioning column, optimize write task can be skewed depending on child partitioning. If a child partition is heavily skewed and has the same partition key, it can't be rebalanced without salted column. It can cause a long running write task with skewed data.
Further details
We could add salted expr, next to partition columns:
val saltedPartitioning = HashPartitioning(
partitionColumns.map(p => output.find(o => resolver(p, o.name)).getOrElse(
throw DeltaErrors.failedFindPartitionColumnInOutputPlan(p))) ++ saltExpr,
numPartitions)
case class SaltColExpr(saltPartSize: Long) extends LeafExpression {
// Transient variable reset to 0 when deserialization.
@transient private[this] var saltVal: Long = 0
@transient private[this] var buf: Long = 0
override def nullable: Boolean = false
override def dataType: DataType = LongType
override def eval(input: InternalRow): Long = {
buf -= 1
if (buf <= 0) {
saltVal += 1
buf = saltPartSize
}
saltVal
}
override def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): SaltColExpr = {
SaltColExpr(saltPartSize)
}
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val saltValTerm = ctx.addMutableState(CodeGenerator.JAVA_LONG, "saltVal")
val bufTerm = ctx.addMutableState(CodeGenerator.JAVA_LONG, "buf")
ctx.addPartitionInitializationStatement(s"$saltValTerm = 0L;")
ctx.addPartitionInitializationStatement(s"$bufTerm = 0L;")
ev.copy(code = code"""
$bufTerm -= 1;
if ($bufTerm <= 0L) {
$bufTerm = ${saltPartSize}L;
$saltValTerm ++;
}
final ${CodeGenerator.javaType(dataType)} ${ev.value} = $saltValTerm;
""", isNull = FalseLiteral)
}
override def nodeName: String = "salt_part_expr"
override def sql: String = s"$prettyName()"
}
The saltPartSize is row count unit to change the salt column value. For example, with saltPartSize=100, 1 for first 1-100 row index, 2 for 101-200 row index, within a child partition.
As we don't have average/estimated row size in bytes or (file size in bytes / row count) statistics, we can't decide it adaptively. It would be good to support adaptive threshold using binSize and avg row size. We can read fileSize/recordCount from one AddFile entry, however it needs to build a snapshot which is unnecessary for write job, or read from lastcheckpointfile like protocol read.
Without utilizing row size estimation, the implementation will remain as a manual mitigation way, when needed.
It would be great if Databricks can open existing implementation, if possible.
Willingness to contribute
The Delta Lake Community encourages new feature contributions. Would you or another member of your organization be willing to contribute an implementation of this feature?
- [ ] Yes. I can contribute this feature independently.
- [x] Yes. I would be willing to contribute this feature with guidance from the Delta Lake community.
- [ ] No. I cannot contribute this feature at this time.
The required change is not large for the manual mitigation - without statistics - fixed salt part size. I can open a PR for this.
In addition, I wonder why we skip single partition: https://github.com/delta-io/delta/blob/4fefba182f81d39f1d11e2f2b85bfa140079ea11/spark/src/main/scala/org/apache/spark/sql/delta/perf/DeltaOptimizedWriterExec.scala#L171
If it is one large partition, OW can't split it
@weiluo-db, @rahulsmahadev any thoughts?