delta
delta copied to clipboard
[Spark] Handle type mismatches in Delta streaming sink
Description
This change enables writing data to a Delta streaming sink using data types that differ from the actual Delta table schema. This is achieved by adding an implicit cast to columns when needed. Casting behavior respects the configuration spark.sql.storeAssignmentPolicy
, similar to batch INSERT.
How was this patch tested?
- Added test suite
DeltaSinkImplicitCastSuite
covering writing to a Delta sink using mismatching types. Covers e.p. interactions with: schema evolution, schema overwrite, column mapping, partition columns, case sensitivity.
Does this PR introduce any user-facing changes?
Previously, writing to a Delta sink using a type that doesn't match the column type in the Delta table failed with DELTA_FAILED_TO_MERGE_FIELDS
:
spark.readStream
.table("delta_source")
# Column 'a' has type INT in 'delta_sink'.
.select(col("a").cast("long").alias("a"))
.writeStream
.format("delta")
.option("checkpointLocation", "<location>")
.toTable("delta_sink")
DeltaAnalysisException: [DELTA_FAILED_TO_MERGE_FIELDS] Failed to merge fields 'a' and 'a'
With this change, writing to the sink now succeeds and data is cast from LONG
to INT
. If any value overflows, the stream fails with (assuming default storeAssignmentPolicy=ANSI
):
SparkArithmeticException: [CAST_OVERFLOW_IN_TABLE_INSERT] Fail to assign a value of 'LONG' type to the 'INT' type column or variable 'a' due to an overflow. Use `try_cast` on the input value to tolerate overflow and return NULL instead."