delta icon indicating copy to clipboard operation
delta copied to clipboard

[Spark] Base merge schema evolution off set expressions, not source schema

Open Kimahriman opened this issue 2 years ago • 12 comments

Which Delta project/connector is this regarding?

  • [x] Spark
  • [ ] Standalone
  • [ ] Flink
  • [ ] Kernel
  • [ ] Other (fill in here)

Description

Resolves #2300

This updates schema evolution in merging to use the target assignment columns and expression schema to determine the new target schema instead of the source schema. The current implementation seems to only support upserting use cases, where the source schema looks like the target schema. However when using custom set expressions, there's no requirement for this to be the case.

When auto schema migration is enabled and the target assignment column doesn't exist in the table, the column parts and the data type of the expression are used for determining schema updates, regardless of what the source schema is. For upserting cases, this works the same way, since the data types and assignment columns will match the source table. But it also supports more use cases where the source schema doesn't match the target schema.

How was this patch tested?

Updated two tests that were showing this use case didn't work, to show it now does work, and added one additional test for adding a new nested column in an update.

Does this PR introduce any user-facing changes?

Yes, supports more schema evolution use cases in merge

Kimahriman avatar Nov 19 '23 13:11 Kimahriman

cc @johanl-db since you added the non-star schema evolution in https://github.com/delta-io/delta/commit/affd5774def34bc4d553aff805cd17afe7d57ea1

Kimahriman avatar Nov 28 '23 14:11 Kimahriman

I actually considered this when implementing non-star schema evolution, the main concern was that schema evolution then becomes very error-prone as any typo will happily add a new column to the target:

MERGE INTO target
USING source
WHEN MATCHED THEN UPDATE SET userid = source.user_id

where the target already contains a column user_id which was misspelled as userid. This concern is actually the main reason schema evolution initially only applied to UPDATE SET * / INSERT * and not non-star action.

We could allow it but in my opinion (and others that I discussed this with) is that this would do more harm than good.

The following should enable the use case you describe on the other hand (using the example from https://github.com/delta-io/delta/issues/2300):

  spark.createDataFrame([["a"], ["b"], ["c"]], ["old"]).write.format("delta").mode("overwrite").save("merge-test")

  val df = spark.createDataFrame([["a", "1"], ["b", "2"], ["c", "3"]], ["first", "second"])
  val source = df.withColumnRenamed("second", "new")

  val dt = DeltaTable.forPath(spark, "file:/path/to/table/merge-test")
  (dt.alias("target").merge(df.alias("source"), "target.old = source.first")
      .whenMatchedUpdate(set = {
          "new": "source.new"
      })
      .execute()

Renaming the source column before merging has the advantage that the intent is explicit:

  val source = df.withColumnRenamed("second", "new")

johanl-db avatar Nov 29 '23 09:11 johanl-db

I actually considered this when implementing non-star schema evolution, the main concern was that schema evolution then becomes very error-prone as any typo will happily add a new column to the target:

MERGE INTO target
USING source
WHEN MATCHED THEN UPDATE SET userid = source.user_id

where the target already contains a column user_id which was misspelled as userid. This concern is actually the main reason schema evolution initially only applied to UPDATE SET * / INSERT * and not non-star action.

Aren't all operations prone to typos? You could have a DataFrame with userid in it and do a UPDATE SET * and end up in the same scenario, not sure how this is any different. If someone is that worried about typos, they should be testing their queries or not enable automatic schema evolution (which is disabled by default so this is all already feature-gated).

The following should enable the use case you describe on the other hand (using the example from #2300):

That was a trivial example. In reality what we are doing is

set={'new.nested.field': complexExpressionInvolvingSourceAndTarget}

Getting the source to "look like" the target is non-trivial for our case without doing a pre join, which then just adds another whole join (really two since merge is two passes). And it provides no real benefit.

Additionally the current implementation has a fun side effect if you deviate from the "copy the exact field" use case (which is only one use case for merge)

(spark.createDataFrame([["a"], ["b"], ["c"]], ["old"])
    .write
    .format("delta")
    .mode("overwrite")
    .save("merge-test")
)

df = spark.createDataFrame([["a", 1], ["b", 2], ["c", 3]], ["first", "second"])

dt = DeltaTable.forPath(spark, "file:/path/to/merge-test")
(dt.alias("target").merge(df.alias("source"), "target.old = source.first")
    .whenMatchedUpdate(set = {
        "first": "source.second"
    })   
    .execute()
)

spark.read.format("delta").load("merge-test").printSchema()
spark.read.format("delta").load("merge-test").show()

The output:

root
 |-- old: string (nullable = true)
 |-- first: string (nullable = true)

+---+-----+
|old|first|
+---+-----+
|  c|    3|
|  b|    2|
|  a|    1|
+---+-----+

Even though the target value I inserted was an integer (second), first is used as the data type to use for schema evolution, adding a string column, and then casting the integer I am inserting to a string. This seems much more problematic than worrying about typos.

Kimahriman avatar Nov 29 '23 13:11 Kimahriman

Schema evolution allows adding columns/fields that are present in the source dataframe but missing in the table schema, the use case you are describing is different though since the field new.nested.field isn't present in the source in the first place. You would need to explicitly add the column to the table beforehand:

DeltaTable.forPath(spark, "file:/path/to/merge-test")
  .addColumn('new.nested.field', dataType= <type>)
  .execute()

Relying on the merge clauses to define what the new schema instead of just the target/source schemas comes with some challenges:

  • It's a departure from the current definition of schema evolution, likely a breaking change.
  • The resulting schema is ambiguous: multiple clauses may introduce the same new column with different types,

johanl-db avatar Nov 30 '23 12:11 johanl-db

Schema evolution allows adding columns/fields that are present in the source dataframe but missing in the table schema, the use case you are describing is different though since the field new.nested.field isn't present in the source in the first place. You would need to explicitly add the column to the table beforehand:

Yes that is the exact scenario I'm trying to fix since we have all the information we need to do the schema evolution automatically without manually adding the column beforehand. Again this assumes that upserting is the only use case for merge, and that is not the case at all. If that was the case then custom set expressions shouldn't be supported. The source table doesn't have to have anything to do with the target table schema wise.

Relying on the merge clauses to define what the new schema instead of just the target/source schemas comes with some challenges:

  • It's a departure from the current definition of schema evolution, likely a breaking change.
  • The resulting schema is ambiguous: multiple clauses may introduce the same new column with different types,

The code is simplified and all the existing tests pass, do you have an example you think will cause problems? The new behavior should be a superset of the existing behavior (beside from fixing the example buggy behavior mentioned in the last post). If you update/insert a field as is then it will use the schema as is as well.

Kimahriman avatar Nov 30 '23 12:11 Kimahriman

User-facing change 1: Assigning to a column that isn't in the target or source now adds that column to the target table instead of failing:

target: key int
source: key int
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN UPDATE SET new_column = source.key

User-facing change 2: Assigning to a column that exists in the source but not the target can now result in the column being added with a different type:

target: key int
source: key int, value string
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN UPDATE SET value = source.value::int

Currently value is added to the target with type string, with your proposal it's added with type int.

I'm not too concerned about the first case as we're just making schema evolution more permissive (hopefully users don't rely on an error being thrown there). The second one could be breaking some use cases though.

The bigger question remaining is what should the resulting type be if there are multiple clauses introducing a new column?

If one is strictly larger than the other, then arguably we should pick the larger type and upcast the smaller one:

WHEN MATCHED AND condition THEN UPDATE SET value = source.value::int
WHEN MATCHED THEN UPDATE SET value = source.value::long

value should be added with type long

What if both types can be upcast to each others:

WHEN MATCHED AND condition THEN UPDATE SET value = source.value::timestamp
WHEN MATCHED THEN UPDATE SET value = source.value::timestampNTZ

Both timestamp and timestampNTZ are valid types for the new column value, how do we pick one? We could for example catch such a case and throw an error because the resulting type is ambiguous.

Similar with string which supports implicit cast to and from most other types:

WHEN MATCHED AND condition THEN UPDATE SET value = source.value::string
WHEN MATCHED THEN UPDATE SET value = source.value::date

Should value in the target table be a date or string?

If we can get a good answer for type reconciliation and get some confidence that the second user-facing change isn't going to break user workloads then that would clear the path forward for this change

johanl-db avatar Dec 01 '23 11:12 johanl-db

User-facing change 1: Assigning to a column that isn't in the target or source now adds that column to the target table instead of failing:

I wouldn't consider this a breaking change, just a new feature. Something that wasn't possible before is now possible.

User-facing change 2: Assigning to a column that exists in the source but not the target can now result in the column being added with a different type:

Technically a breaking change, but I would consider this a bug fix. I can't imagine a reasonable person would expect this behavior, let alone rely on it.

The bigger question remaining is what should the resulting type be if there are multiple clauses introducing a new column? ... If we can get a good answer for type reconciliation and get some confidence that the second user-facing change isn't going to break user workloads then that would clear the path forward for this change

I will add some tests case for this. Because it all uses the mergeSchemas helper with allowImplicitConversions=true, I would expect it would do type reconciliation to the smallest common type, but the tests cases will find out for sure.

Kimahriman avatar Dec 01 '23 17:12 Kimahriman

So it looks like the current behavior is all based around Spark's implicitCast, which itself depends on the order of the types you're merging. Giving this, it seems like there's 4 possible options for the behavior of multiple expressions to the same new field:

  1. Use the implicitCast behavior, ensuring to process all clauses in order, regardless of update or insert. This behavior is highly dependent on the two types for how it behaves, but it's what the current process uses for adding any new fields into the target table.
  2. Use a method like you described of "upcasting" to the common type. I would need to see if there's any existing Delta or Spark helper to do something like this. This would be a little different than the implicit cast. For example merging an int and a long (in that order), would result in an int in the implicitCast method (casting the long to an int), but a long in an "upcasting" approach.
  3. Use the first type seen for a given target column and then rely on the casting of any additional uses of that target column.
  4. Check that all assignments to the same column are the same data type, and fail if not. Maybe the least ambiguous approach from a user perspective.

I added some unit tests showing the current behavior, which is mostly 1) but without the full ordering across both insert and update

Kimahriman avatar Dec 01 '23 21:12 Kimahriman

The order of the clauses shouldn't have an impact on the resulting type of a column, that would be arbitrary. This rules out 1. and 3. and leaves 2., the more permissive approach and 4. the stricter one.

The issue with 4. and failing when the clauses use different types is that MERGE operations may start to fail:

source: float_col: float, new_column: double
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT (new_column) VALUES (source.float_col)

This is valid today and will insert new_column as double but would fail with approach 4. It also leads to cases that have limitations that seem very arbitrary:

WHEN MATCHED AND condition1 THEN UPDATE SET new_column = 5 -- int
WHEN MATCHED AND condition2 THEN UPDATE SET new_column = 10 / 2 -- double

fails because the two types are different.

I'd be in favor of the 3rd option, reconciling the different types found but being very careful about what is allowed. Spark's AnsiTypeCoercion.findTightestCommonType seems suited, it'll allow common type widening without being too permissive - e.p. it forbids a whole class of ambiguous cases I mentioned in my previous comment which implicit cast or even upcast would allow.

Few things that I see that would need to be covered:

  • If the column already exists in the target, then the target type should not be changed.
  • Sufficient testing of the type reconciliation logic.
  • Struct evolution must work correctly, e.p. adding fields to structs nested in maps/arrays. I don't think we necessarily need special handling in this change but this is an area that was missed in the past and we don't have tests yet that cover adding fields that exists already in the source.

johanl-db avatar Dec 04 '23 11:12 johanl-db

The order of the clauses shouldn't have an impact on the resulting type of a column, that would be arbitrary.

Don't merge clauses have a specific overall order, even across different types of clauses? In that case it wouldn't be arbitrary, it just might not be obvious to users what the behavior is.

The issue with 4. and failing when the clauses use different types is that MERGE operations may start to fail:

source: float_col: float, new_column: double
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT (new_column) VALUES (source.float_col)

This is valid today and will insert new_column as double but would fail with approach 4.

This still seems like very odd behavior to me. new_column in the source technically has nothing to do with new_column in the target if you are doing custom set expressions. Forcing both types to be the same would eliminate unexpected behavior IMO.

It also leads to cases that have limitations that seem very arbitrary:

WHEN MATCHED AND condition1 THEN UPDATE SET new_column = 5 -- int
WHEN MATCHED AND condition2 THEN UPDATE SET new_column = 10 / 2 -- double

fails because the two types are different.

Same thing, I would almost argue it's better to be strict about it. If there's any ambiguity, force the user to decide what type it should be, instead of choosing some method of resolving directly. In this example, the user may want the column to be an int, but not even realize 10 / 2 would be a double and not an int.

Not saying I definitely think this is the right approach, just saying I still think there's an argument for stricter being better.

I'd be in favor of the 3rd option, reconciling the different types found but being very careful about what is allowed. Spark's [AnsiTypeCoercion.findTightestCommonType] Few things that I see that would need to be covered:

  • If the column already exists in the target, then the target type should not be changed.
  • Sufficient testing of the type reconciliation logic.
  • Struct evolution must work correctly, e.p. adding fields to structs nested in maps/arrays. I don't think we necessarily need special handling in this change but this is an area that was missed in the past and we don't have tests yet that cover adding fields that exists already in the source.

I'll start working on this option and see if I can test enough of the cases.

Kimahriman avatar Dec 04 '23 17:12 Kimahriman

Updated to use TypeCoercion.findWiderTypeForTwo

AnsiTypeCoercion will resolve a string and a integral to a long, and not a string. The non-ansi TypeCoercion seemed like a better fit.

I updated the tests I made to suit. Working on adding some test for maps and arrays.

Kimahriman avatar Dec 04 '23 22:12 Kimahriman

Added more complex type tests, let me know what how it looks

Kimahriman avatar Dec 06 '23 18:12 Kimahriman