[Feature Request] add WHEN NOT MATCHED BY SOURCE/TARGET clause suppoort
Feature request
https://delta-users.slack.com/archives/CJ70UCSHM/p1661955032288519
Overview
WHEN NOT MATCHED BY SOURCE/TARGET clause support
Motivation
feature parity with popular other SQL databases, ease of use
Further details
Each day I get a full dump of a table. However, this data needs to be cleaned and in particular, compressed using the SCD2 style approach to be easily consumable downstream. Unfortunately, I do not get changesets or a NULL value for a key in case of deletions. I only receive NO LONGER a row (including the key). The links:
- https://docs.databricks.com/_static/notebooks/merge-in-scd-type-2.html
- https://docs.delta.io/latest/delta-update.html#write-change-data-into-a-delta-table
- https://docs.databricks.com/spark/latest/spark-sql/language-manual/delta-merge-into.html show me that something along these lines (https://www.mssqltips.com/sqlservertip/1704/using-merge-in-sql-server-to-insert-update-and-delete-at-the-same-time):
MERGE <target_table> [AS TARGET]
USING <table_source> [AS SOURCE]
ON <search_condition>
[WHEN MATCHED
THEN <merge_matched> ]
[WHEN NOT MATCHED [BY TARGET]
THEN <merge_not_matched> ]
[WHEN NOT MATCHED BY SOURCE
THEN <merge_matched> ];
will not work with Delta/Spark as the WHEN NOT MATCHED clause does not seem to support the BY SOURCE | TARGET extension. How can I still calculate the SCD2 representation?
- If the table is empty simply take all the data (for the initial load)
- When a new day/full copy of the data arrives:
- INSERT any new keys into the table
- For EXISTING keys perform an update (set the OLD value to be no longer valid (set end-date) and produce a new row in SCD2 with the contents of the new row and validity until infinity (end-date null))
- In case a previously present key Is no longer present close the SCD2 valid_from/valid_to interval by setting end-date
- In case a new record arrives for this key in the future start a new fresh SCD2 row valid until infinity for this new row/values.
An example case/dataset:
import pandas as pd
import numpy as np
# assumes a running spark session including support for deltalake to be available
d1 = spark.createDataFrame(pd.DataFrame({'key':[1,2,3], 'value':[4,5,6],'value2':["a", "b", "c"], 'date':[1,1,1]}))
#d1.show()
# notice one entry is MISSING (it should be deleted) or rather SCD2 invalidated
d2 = spark.createDataFrame(pd.DataFrame({'key':[1,2], 'value':[4,5], 'date':[2,2],'value2':["a", "b"]}))
# d2 had (3) as missing - this entry is back now (and should start a new SCD2 row
d3 = spark.createDataFrame(pd.DataFrame({'key':[1,2,3], 'value':[4,5, 66], 'date':[3,3,3], 'value2':["a", "b", "c"]}))
# a new record is added
d4 = spark.createDataFrame(pd.DataFrame({'key':[1,2,3, 4], 'value':[4,5, 66, 44], 'date':[4,4,4,4], 'value2':["a", "b", "c", "d"]}))
# a new record is added, one removed and one updated
d5 = spark.createDataFrame(pd.DataFrame({'key':[2,3, 4, 5], 'value':[5, 67, 44, 55], 'date':[5,5,5,5], 'value2':["b", "c", "d", "e"]}))
Willingness to contribute
The Delta Lake Community encourages new feature contributions. Would you or another member of your organization be willing to contribute an implementation of this feature?
- [ ] Yes. I can contribute this feature independently.
- [ ] Yes. I would be willing to contribute this feature with guidance from the Delta Lake community.
- [x] No. I cannot contribute this feature at this time.
Is there any workaround / current solution (perhaps not in the SQL but scala/Python API?
Thanks for creating this issue @geoHeil! Copying over a suggested approach from the Slack thread:
- No out-of-the-box way to do this in a single atomic command today
- You can LEFT join the target to the source and derive a new column for rows to DELETE, then run a MERGE with multiple WHEN MATCHED clauses to cover all the cases
Otherwise, the feature request is a good one, so we'll leave it open here if anyone from the community would like to work on it.
A small example:
step 1: initial write
# state cleanup
!rm -rf dummy_delta
# initial first write of data (first load of data)"
d1.withColumn("is_current", F.lit(True)).withColumn("valid_to", F.lit(None).astype("int")).write.mode("overwrite").format("delta").save("dummy_delta")
#.option("readChangeFeed", "true")
delta_state = spark.read.format("delta").load("dummy_delta")
delta_state.printSchema()
delta_state.show()
First UPSERT (including SCD2 soft delete):
# JOIN the existing state with the data
# DELETE
current_dt = 1.0
print(delta_state.count())
r = delta_state.join(d2.withColumnRenamed("valid_to", "valid_to_c").withColumnRenamed("value", "value_c").withColumnRenamed("date", "date_c"), on=['key'], how='left').withColumn("is_current", F.when(F.col("value_c").isNull(), F.lit(False)).otherwise(F.col("is_current"))).withColumn("valid_to", F.when(F.col("value_c").isNull(), F.lit(current_dt)).otherwise(F.col("valid_to")))
# shrink back (i.e. manual MERGE INTO)
r = r.select("key", "value", "date", "valid_to", "is_current")
print(r.count())
r.show()
# SCD2 not relevant: in this example no additional rows/update rows are present
# overwrite the latest state
r.write.mode("overwrite").format("delta").save("dummy_delta")
Next (2nd) UPSERT (including SCD2 soft delete):
d2 had (3) as missing - this entry is back now (and should start a new SCD2 row with fresh valid_from valid until infinity (null for valid_to)
delta_state = spark.read.format("delta").load("dummy_delta")
current_dt = 2.0
print(delta_state.count())
# WARNING: must do outer join (and filter accordingly) otherwise it is unneccessarily many joins later
r = delta_state.join(d3.withColumnRenamed("valid_to", "valid_to_c").withColumnRenamed("value", "value_c").withColumnRenamed("date", "date_c"), on=['key'], how='left').withColumn("is_current", F.when(F.col("value_c").isNull(), F.lit(False)).otherwise(F.col("is_current"))).withColumn("valid_to", F.when(F.col("value_c").isNull(), F.lit(current_dt)).otherwise(F.col("valid_to")))
# shrink back (i.e. manual MERGE INTO)
r = r.select("key", "value", "date", "valid_to", "is_current")
print(r.count())
r.show()
# store deletions
r.write.mode("overwrite").format("delta").save("dummy_delta")
# how to do the SCD2 (add row or insert) in a nice way? Perhaps using MERGE INTO?
now try to achieve the SCD2 (closing old row inserting new row) via MERGE INTO:
# state:
+---+-----+----+--------+----------+
|key|value|date|valid_to|is_current|
+---+-----+----+--------+----------+
| 1| 4| 1| null| true|
| 3| 6| 1| 1.0| false|
| 2| 5| 1| null| true|
+---+-----+----+--------+----------+
# current update:
+---+-----+----+
|key|value|date|
+---+-----+----+
| 1| 40| 3|
| 2| 50| 3|
| 3| 66| 3|
+---+-----+----+
NOTICE: d2 had (3) as missing - this entry is back now (and should start a new SCD2 row
delta_state = DeltaTable.forPath(spark, 'dummy_delta')
stagedUpdates = d3.drop("valid_to")
delta_state.toDF().filter(F.col("is_current") == True).join(stagedUpdates.withColumnRenamed("value", "value_c").withColumnRenamed("date", "date_c"), on=['key'], how='outer').show()
+---+-----+----+--------+----------+-------+------+
|key|value|date|valid_to|is_current|value_c|date_c|
+---+-----+----+--------+----------+-------+------+
| 1| 4| 1| null| true| 4| 3|
| 2| 5| 1| null| true| 5| 3|
| 3| null|null| null| null| 66| 3|
+---+-----+----+--------+----------+-------+------+
now trying the merge into:
# Apply SCD Type 2 operation using merge
delta_state.alias("s").merge(stagedUpdates.withColumnRenamed("key", "mergeKey").alias("c"),
"s.key = mergeKey and s.is_current = true") \
.whenMatchedUpdate(
condition = "s.is_current = true AND s.value <> c.value",
set = { # Set current to false and endDate to source's effective date.
"is_current": "false",
"valid_to": "c.date"
}
).whenNotMatchedInsert(
values = {
"key": "c.mergeKey",
"value": "c.value",
"is_current": "true",
"date": "c.date", # Set current to true along with the new address and its effective date.
"valid_to": "null"
}
).execute()
delta_state = spark.read.format("delta").load("dummy_delta")
delta_state.show(50)
+---+-----+----+--------+----------+
|key|value|date|valid_to|is_current|
+---+-----+----+--------+----------+
| 1| 4| 1| null| true|
| 2| 5| 1| null| true|
| 3| 66| 3| null| true|
| 3| 6| 1| 1.0| false|
+---+-----+----+--------+----------+
questions:
- how to handle multiple columns more efficiently this is super clumsy.
- in particular in the above workaround for the deletion
- performance: I am using one additional join (to handle the deletion). Couldn`t this be simplified with an OUTER join (to only join once?
Some of the questions (around easing the handling of deletions) can be partially handled with:
def perform_rename(df, mapping):
for old_value, new_value in mapping.items():
df = df.withColumnRenamed(old_value, new_value)
return df
def handle_deletions(updates_df, keys, current_dt, date_col='date',columns_to_drop=[], is_current_col='is_current', valid_to_col='valid_to'):
def _(df_current_state):
original_columns = delta_state.columns
mapping = dict([(c, f'{c}_c') for c in updates_df.drop(*keys).drop(*columns_to_drop).columns])
value_cols = updates_df.drop(*keys).drop(*columns_to_drop).columns
r = df_current_state.join(perform_rename(updates_df, mapping), on=keys, how='left')
# it is good enough to look only at a single value (all wil be NULL anyways)
v = f"{value_cols[0]}_c"
r = r.withColumn(is_current_col, F.when(F.col(v).isNull(), F.lit(False)).otherwise(F.col(is_current_col))).withColumn(valid_to_col, F.when(F.col(v).isNull(), F.lit(current_dt)).otherwise(F.col(valid_to_col)))
return r.select(original_columns)
return _
def handle_merge(path, updates, current_dt):
delta_state = spark.read.format("delta").load(path)
deletions_handled = delta_state.transform(handle_deletions(updates, keys=['key'], current_dt=current_dt))
deletions_handled.write.mode("overwrite").format("delta").save("dummy_delta")
#-------------------------------------------------
# Apply SCD Type 2 operation using merge
delta_state = DeltaTable.forPath(spark, path)
delta_state.alias("s").merge(updates.alias("c"),
"s.key = c.key and s.is_current = true") \
.whenMatchedUpdate(
condition = "s.is_current = true AND s.value <> c.value",
set = { # Set current to false and endDate to source's effective date.
"is_current": "false",
"valid_to": "c.date"
}
).whenNotMatchedInsert(
values = {
"key": "c.key",
"value": "c.value",
"value2": "c.value2",
"is_current": "true",
"date": "c.date", # Set current to true along with the new address and its effective date.
"valid_to": "null"
}
).execute()
delta_state = spark.read.format("delta").load("dummy_delta")
delta_state.show(50)
now when applying this function:
# notice one entry is MISSING (it should be deleted) or rather SCD2 invalidated
handle_merge('dummy_delta', d2, current_dt=2)
+---+-----+------+----+----------+--------+
|key|value|value2|date|is_current|valid_to|
+---+-----+------+----+----------+--------+
| 1| 4| a| 1| true| null|
| 2| 5| b| 1| true| null|
| 3| 6| c| 1| false| 2|
+---+-----+------+----+----------+--------+
Works as expected
# d2 had (3) as missing - this entry is back now (and should start a new SCD2 row
handle_merge('dummy_delta', d3, current_dt=3)
+---+-----+------+----+----------+--------+
|key|value|value2|date|is_current|valid_to|
+---+-----+------+----+----------+--------+
| 1| 4| a| 1| true| null|
| 2| 5| b| 1| true| null|
| 3| 66| c| 3| true| null|
| 3| 6| c| 1| false| 2|
+---+-----+------+----+----------+--------+
works as expected
# a new record is added
handle_merge('dummy_delta', d4, current_dt=4)
+---+-----+------+----+----------+--------+
|key|value|value2|date|is_current|valid_to|
+---+-----+------+----+----------+--------+
| 1| 4| a| 1| true| null|
| 2| 5| b| 1| true| null|
| 3| 66| c| 3| true| null|
| 3| 6| c| 1| false| 2|
| 4| 44| d| 4| true| null|
+---+-----+------+----+----------+--------+
works as expected
d5.show()
+---+-----+----+------+
|key|value|date|value2|
+---+-----+----+------+
| 2| 5| 5| b|
| 3| 67| 5| c|
| 4| 44| 5| d|
| 5| 55| 5| e|
+---+-----+----+------+
# a new record is added, one removed and one updated
handle_merge('dummy_delta', d5, current_dt=5)
+---+-----+------+----+----------+--------+
|key|value|value2|date|is_current|valid_to|
+---+-----+------+----+----------+--------+
| 1| 4| a| 1| false| 5|
| 2| 5| b| 1| true| null|
| 3| 66| c| 3| false| 5|
| 3| 6| c| 1| false| 2|
| 4| 44| d| 4| true| null|
| 5| 55| e| 5| true| null|
+---+-----+------+----+----------+--------+
this is not working as expected. The old record is Invalidated. Notice however, that the new record is not written (opening a new fresh row starting from that date)
When re-executing for a 2nd time:
handle_merge('dummy_delta', d5, current_dt=5)
+---+-----+------+----+----------+--------+
|key|value|value2|date|is_current|valid_to|
+---+-----+------+----+----------+--------+
| 1| 4| a| 1| false| 5|
| 2| 5| b| 1| true| null|
| 3| 67| c| 5| true| null|
| 3| 66| c| 3| false| 5|
| 3| 6| c| 1| false| 2|
| 4| 44| d| 4| true| null|
| 5| 55| e| 5| true| null|
+---+-----+------+----+----------+--------+
the 67 appears. How can I fix the MERGE INTO conditions so the value appears straight away? Is there perhaps some additional MATCH condition required?
How can I specify a whenNotMatchedInsert for whenMatchedUpdate to insert the fresh row after invalidating the current?
This feature seems neat! Unfortunately, we are already fully booked in our H2 roadmap, and since you've indicated you are unable to contribute a solution, there's not much we can do right now.
When we begin our 2023 roadmap planning, please bring up this issue again so that we can get the community's input on how desired it is!
Cheers.
FYI I actually did all the work a couple of years ago and have a branch with this implemented for the Scala API only here: https://github.com/tripl-ai/delta
At the time the PR was rejected to this repo but if you are motivated the code could be updated for latest Delta (not by me).
I created a design doc to implement support for WHEN NOT MATCHED BY SOURCE clauses: [Design Doc] WHEN NOT MATCHED BY SOURCE. This enables selectively updating or deleting target rows that have no matches in the source table based on the merge condition.
API
A new whenNotMatchedBySource(condition) method is added to the Delta Table API, similar to the existing whenMatched(condition) and whenNotMatched(condition) methods. It returns a builder that allows specifying the action to apply using update(set) or delete(). whenNotMatchedBySource(condition) accepts an optional condition that needs to be satisfied for the corresponding action to be applied.
Usage example:
targetDeltaTable.as(“t”)
.merge(sourceTable.as(“s”), condition = “t.key = s.key”)
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.whenNotMatchedBySource(condition = “t.value > 0”).update(set = “t.value = 0”)
.whenNotMatchedBySource().delete()
This merge invocation will:
- update all target rows that have a match in the source table using the source value.
- Insert all source rows that have no match in the target into the target table.
- Update all target rows that have no match in the source if
t.valueis strictly positive, otherwise delete the target row.
More details on the API and the implementation proposal can be found in the design doc. The SQL API will be shipped with Spark 3.4, see https://github.com/apache/spark/pull/38400.
Project Plan
| Task | Description | Status | PR |
|---|---|---|---|
| Delta API Scala Support | Implement support for the clause in Delta using the Scala DeltaTable API. | Done | #1511 |
| Delta API Python Support | Implement support for the clause in Delta using the Python API. | Done | #1533 |
| SQL Support | After Spark 3.4 release / upgrading to Spark 3.4, make necessary changes to support the clause in SQL. | Done | https://github.com/delta-io/delta/pull/1740 |
solved by https://github.com/delta-io/delta/pull/1511
Just posting for visibility, from #1511: Support for WHEN NOT MATCHED BY SOURCE using SQL will be available with Spark 3.4 release and python support will follow up in a different PR.
SQL support is in https://github.com/delta-io/delta/pull/1740