delta icon indicating copy to clipboard operation
delta copied to clipboard

[Spark] Handle type mismatches in Delta streaming sink

Open johanl-db opened this issue 6 months ago • 0 comments

Description

This change enables writing data to a Delta streaming sink using data types that differ from the actual Delta table schema. This is achieved by adding an implicit cast to columns when needed. Casting behavior respects the configuration spark.sql.storeAssignmentPolicy, similar to batch INSERT.

How was this patch tested?

  • Added test suite DeltaSinkImplicitCastSuite covering writing to a Delta sink using mismatching types. Covers e.p. interactions with: schema evolution, schema overwrite, column mapping, partition columns, case sensitivity.

Does this PR introduce any user-facing changes?

Previously, writing to a Delta sink using a type that doesn't match the column type in the Delta table failed with DELTA_FAILED_TO_MERGE_FIELDS:

spark.readStream
    .table("delta_source")
    # Column 'a' has type INT in 'delta_sink'.
    .select(col("a").cast("long").alias("a"))
    .writeStream
    .format("delta")
    .option("checkpointLocation", "<location>")
    .toTable("delta_sink")

DeltaAnalysisException: [DELTA_FAILED_TO_MERGE_FIELDS] Failed to merge fields 'a' and 'a'

With this change, writing to the sink now succeeds and data is cast from LONG to INT. If any value overflows, the stream fails with (assuming default storeAssignmentPolicy=ANSI):

SparkArithmeticException: [CAST_OVERFLOW_IN_TABLE_INSERT] Fail to assign a value of 'LONG' type to the 'INT' type column or variable 'a' due to an overflow. Use `try_cast` on the input value to tolerate overflow and return NULL instead."

johanl-db avatar Jul 30 '24 13:07 johanl-db