delta icon indicating copy to clipboard operation
delta copied to clipboard

[Feature Request] add WHEN NOT MATCHED BY SOURCE/TARGET clause suppoort

Open geoHeil opened this issue 3 years ago • 5 comments

Feature request

https://delta-users.slack.com/archives/CJ70UCSHM/p1661955032288519

Overview

WHEN NOT MATCHED BY SOURCE/TARGET clause support

Motivation

feature parity with popular other SQL databases, ease of use

Further details

Each day I get a full dump of a table. However, this data needs to be cleaned and in particular, compressed using the SCD2 style approach to be easily consumable downstream. Unfortunately, I do not get changesets or a NULL value for a key in case of deletions. I only receive NO LONGER a row (including the key). The links:

  • https://docs.databricks.com/_static/notebooks/merge-in-scd-type-2.html
  • https://docs.delta.io/latest/delta-update.html#write-change-data-into-a-delta-table
  • https://docs.databricks.com/spark/latest/spark-sql/language-manual/delta-merge-into.html show me that something along these lines (https://www.mssqltips.com/sqlservertip/1704/using-merge-in-sql-server-to-insert-update-and-delete-at-the-same-time):
MERGE <target_table> [AS TARGET]
USING <table_source> [AS SOURCE]
ON <search_condition>
[WHEN MATCHED 
   THEN <merge_matched> ]
[WHEN NOT MATCHED [BY TARGET]
   THEN <merge_not_matched> ]
[WHEN NOT MATCHED BY SOURCE
   THEN <merge_matched> ];

will not work with Delta/Spark as the WHEN NOT MATCHED clause does not seem to support the BY SOURCE | TARGET extension. How can I still calculate the SCD2 representation?

  1. If the table is empty simply take all the data (for the initial load)
  2. When a new day/full copy of the data arrives:
  • INSERT any new keys into the table
  • For EXISTING keys perform an update (set the OLD value to be no longer valid (set end-date) and produce a new row in SCD2 with the contents of the new row and validity until infinity (end-date null))
  • In case a previously present key Is no longer present close the SCD2 valid_from/valid_to interval by setting end-date
    • In case a new record arrives for this key in the future start a new fresh SCD2 row valid until infinity for this new row/values.

An example case/dataset:

import pandas as pd
import numpy as np
# assumes a running spark session including support for deltalake to be available

d1 = spark.createDataFrame(pd.DataFrame({'key':[1,2,3], 'value':[4,5,6],'value2':["a", "b", "c"], 'date':[1,1,1]}))
#d1.show()

# notice one entry is MISSING (it should be deleted) or rather SCD2 invalidated
d2 = spark.createDataFrame(pd.DataFrame({'key':[1,2], 'value':[4,5], 'date':[2,2],'value2':["a", "b"]}))

# d2 had (3) as missing - this entry is back now (and should start a new SCD2 row
d3 = spark.createDataFrame(pd.DataFrame({'key':[1,2,3], 'value':[4,5, 66], 'date':[3,3,3], 'value2':["a", "b", "c"]}))

# a new record is added
d4 = spark.createDataFrame(pd.DataFrame({'key':[1,2,3, 4], 'value':[4,5, 66, 44], 'date':[4,4,4,4], 'value2':["a", "b", "c", "d"]}))

# a new record is added, one removed and one updated
d5 = spark.createDataFrame(pd.DataFrame({'key':[2,3, 4, 5], 'value':[5, 67, 44, 55], 'date':[5,5,5,5], 'value2':["b", "c", "d", "e"]}))

Willingness to contribute

The Delta Lake Community encourages new feature contributions. Would you or another member of your organization be willing to contribute an implementation of this feature?

  • [ ] Yes. I can contribute this feature independently.
  • [ ] Yes. I would be willing to contribute this feature with guidance from the Delta Lake community.
  • [x] No. I cannot contribute this feature at this time.

geoHeil avatar Aug 31 '22 20:08 geoHeil

Is there any workaround / current solution (perhaps not in the SQL but scala/Python API?

geoHeil avatar Aug 31 '22 20:08 geoHeil

Thanks for creating this issue @geoHeil! Copying over a suggested approach from the Slack thread:

  • No out-of-the-box way to do this in a single atomic command today
  • You can LEFT join the target to the source and derive a new column for rows to DELETE, then run a MERGE with multiple WHEN MATCHED clauses to cover all the cases

Otherwise, the feature request is a good one, so we'll leave it open here if anyone from the community would like to work on it.

nkarpov avatar Sep 01 '22 16:09 nkarpov

A small example:

step 1: initial write

# state cleanup
!rm -rf dummy_delta

# initial first write of data (first load of data)"
d1.withColumn("is_current", F.lit(True)).withColumn("valid_to", F.lit(None).astype("int")).write.mode("overwrite").format("delta").save("dummy_delta")
#.option("readChangeFeed", "true")
delta_state = spark.read.format("delta").load("dummy_delta")
delta_state.printSchema()
delta_state.show()

First UPSERT (including SCD2 soft delete):

# JOIN the existing state with the data

# DELETE
current_dt = 1.0
print(delta_state.count())
r = delta_state.join(d2.withColumnRenamed("valid_to", "valid_to_c").withColumnRenamed("value", "value_c").withColumnRenamed("date", "date_c"), on=['key'], how='left').withColumn("is_current", F.when(F.col("value_c").isNull(), F.lit(False)).otherwise(F.col("is_current"))).withColumn("valid_to", F.when(F.col("value_c").isNull(), F.lit(current_dt)).otherwise(F.col("valid_to")))
# shrink back (i.e. manual MERGE INTO)
r = r.select("key", "value", "date", "valid_to", "is_current")
print(r.count())
r.show()

# SCD2 not relevant: in this example no additional rows/update rows are present 


# overwrite the latest state
r.write.mode("overwrite").format("delta").save("dummy_delta")

Next (2nd) UPSERT (including SCD2 soft delete):

d2 had (3) as missing - this entry is back now (and should start a new SCD2 row with fresh valid_from valid until infinity (null for valid_to)

delta_state = spark.read.format("delta").load("dummy_delta")
current_dt = 2.0

print(delta_state.count())
# WARNING: must do outer join (and filter accordingly) otherwise it is unneccessarily many joins later
r = delta_state.join(d3.withColumnRenamed("valid_to", "valid_to_c").withColumnRenamed("value", "value_c").withColumnRenamed("date", "date_c"), on=['key'], how='left').withColumn("is_current", F.when(F.col("value_c").isNull(), F.lit(False)).otherwise(F.col("is_current"))).withColumn("valid_to", F.when(F.col("value_c").isNull(), F.lit(current_dt)).otherwise(F.col("valid_to")))

# shrink back (i.e. manual MERGE INTO)
r = r.select("key", "value", "date", "valid_to", "is_current")
print(r.count())
r.show()

# store deletions
r.write.mode("overwrite").format("delta").save("dummy_delta")
# how to do the SCD2 (add row or insert) in a nice way? Perhaps using MERGE INTO?

now try to achieve the SCD2 (closing old row inserting new row) via MERGE INTO:

# state:
+---+-----+----+--------+----------+
|key|value|date|valid_to|is_current|
+---+-----+----+--------+----------+
|  1|    4|   1|    null|      true|
|  3|    6|   1|     1.0|     false|
|  2|    5|   1|    null|      true|
+---+-----+----+--------+----------+

# current update:
+---+-----+----+
|key|value|date|
+---+-----+----+
|  1|   40|   3|
|  2|   50|   3|
|  3|   66|   3|
+---+-----+----+

NOTICE: d2 had (3) as missing - this entry is back now (and should start a new SCD2 row

delta_state = DeltaTable.forPath(spark, 'dummy_delta')
stagedUpdates = d3.drop("valid_to")

delta_state.toDF().filter(F.col("is_current") == True).join(stagedUpdates.withColumnRenamed("value", "value_c").withColumnRenamed("date", "date_c"), on=['key'], how='outer').show()

+---+-----+----+--------+----------+-------+------+
|key|value|date|valid_to|is_current|value_c|date_c|
+---+-----+----+--------+----------+-------+------+
|  1|    4|   1|    null|      true|      4|     3|
|  2|    5|   1|    null|      true|      5|     3|
|  3| null|null|    null|      null|     66|     3|
+---+-----+----+--------+----------+-------+------+

now trying the merge into:

# Apply SCD Type 2 operation using merge
delta_state.alias("s").merge(stagedUpdates.withColumnRenamed("key", "mergeKey").alias("c"),
  "s.key = mergeKey and s.is_current = true") \
.whenMatchedUpdate(
  condition = "s.is_current = true AND s.value <> c.value",
  set = {                                      # Set current to false and endDate to source's effective date.
    "is_current": "false",
    "valid_to": "c.date"
  }
).whenNotMatchedInsert(
  values = {
    "key": "c.mergeKey",
    "value": "c.value",
    "is_current": "true",
    "date": "c.date",  # Set current to true along with the new address and its effective date.
    "valid_to": "null"
  }
).execute()

delta_state = spark.read.format("delta").load("dummy_delta")
delta_state.show(50)

+---+-----+----+--------+----------+
|key|value|date|valid_to|is_current|
+---+-----+----+--------+----------+
|  1|    4|   1|    null|      true|
|  2|    5|   1|    null|      true|
|  3|   66|   3|    null|      true|
|  3|    6|   1|     1.0|     false|
+---+-----+----+--------+----------+

questions:

  • how to handle multiple columns more efficiently this is super clumsy.
  • in particular in the above workaround for the deletion
  • performance: I am using one additional join (to handle the deletion). Couldn`t this be simplified with an OUTER join (to only join once?

geoHeil avatar Sep 02 '22 19:09 geoHeil

Some of the questions (around easing the handling of deletions) can be partially handled with:

def perform_rename(df, mapping):
    for old_value, new_value in mapping.items():
        df = df.withColumnRenamed(old_value, new_value)
    return df


def handle_deletions(updates_df, keys, current_dt, date_col='date',columns_to_drop=[], is_current_col='is_current', valid_to_col='valid_to'):
    def _(df_current_state):
        original_columns = delta_state.columns        
        mapping = dict([(c, f'{c}_c') for c in updates_df.drop(*keys).drop(*columns_to_drop).columns])
        value_cols = updates_df.drop(*keys).drop(*columns_to_drop).columns
        r = df_current_state.join(perform_rename(updates_df, mapping), on=keys, how='left')
        
        # it is good enough to look only at a single value (all wil be NULL anyways)
        v = f"{value_cols[0]}_c"
        r = r.withColumn(is_current_col, F.when(F.col(v).isNull(), F.lit(False)).otherwise(F.col(is_current_col))).withColumn(valid_to_col, F.when(F.col(v).isNull(), F.lit(current_dt)).otherwise(F.col(valid_to_col)))
        return r.select(original_columns)

    return _

def handle_merge(path, updates, current_dt):
    delta_state = spark.read.format("delta").load(path)
    deletions_handled = delta_state.transform(handle_deletions(updates, keys=['key'], current_dt=current_dt))
    deletions_handled.write.mode("overwrite").format("delta").save("dummy_delta")
    
    #-------------------------------------------------
    # Apply SCD Type 2 operation using merge
    delta_state = DeltaTable.forPath(spark, path)
    delta_state.alias("s").merge(updates.alias("c"),
      "s.key = c.key and s.is_current = true") \
    .whenMatchedUpdate(
      condition = "s.is_current = true AND s.value <> c.value",
      set = {                                      # Set current to false and endDate to source's effective date.
        "is_current": "false",
        "valid_to": "c.date"
      }
    ).whenNotMatchedInsert(
      values = {
        "key": "c.key",
        "value": "c.value",
        "value2": "c.value2",
        "is_current": "true",
        "date": "c.date",  # Set current to true along with the new address and its effective date.
        "valid_to": "null"
      }
    ).execute()

    delta_state = spark.read.format("delta").load("dummy_delta")
    delta_state.show(50)

now when applying this function:

# notice one entry is MISSING (it should be deleted) or rather SCD2 invalidated
handle_merge('dummy_delta', d2, current_dt=2)

+---+-----+------+----+----------+--------+
|key|value|value2|date|is_current|valid_to|
+---+-----+------+----+----------+--------+
|  1|    4|     a|   1|      true|    null|
|  2|    5|     b|   1|      true|    null|
|  3|    6|     c|   1|     false|       2|
+---+-----+------+----+----------+--------+

Works as expected

# d2 had (3) as missing - this entry is back now (and should start a new SCD2 row
handle_merge('dummy_delta', d3, current_dt=3)

+---+-----+------+----+----------+--------+
|key|value|value2|date|is_current|valid_to|
+---+-----+------+----+----------+--------+
|  1|    4|     a|   1|      true|    null|
|  2|    5|     b|   1|      true|    null|
|  3|   66|     c|   3|      true|    null|
|  3|    6|     c|   1|     false|       2|
+---+-----+------+----+----------+--------+

works as expected

# a new record is added
handle_merge('dummy_delta', d4, current_dt=4)
+---+-----+------+----+----------+--------+
|key|value|value2|date|is_current|valid_to|
+---+-----+------+----+----------+--------+
|  1|    4|     a|   1|      true|    null|
|  2|    5|     b|   1|      true|    null|
|  3|   66|     c|   3|      true|    null|
|  3|    6|     c|   1|     false|       2|
|  4|   44|     d|   4|      true|    null|
+---+-----+------+----+----------+--------+

works as expected

d5.show()
+---+-----+----+------+
|key|value|date|value2|
+---+-----+----+------+
|  2|    5|   5|     b|
|  3|   67|   5|     c|
|  4|   44|   5|     d|
|  5|   55|   5|     e|
+---+-----+----+------+


# a new record is added, one removed and one updated
handle_merge('dummy_delta', d5, current_dt=5)

+---+-----+------+----+----------+--------+
|key|value|value2|date|is_current|valid_to|
+---+-----+------+----+----------+--------+
|  1|    4|     a|   1|     false|       5|
|  2|    5|     b|   1|      true|    null|
|  3|   66|     c|   3|     false|       5|
|  3|    6|     c|   1|     false|       2|
|  4|   44|     d|   4|      true|    null|
|  5|   55|     e|   5|      true|    null|
+---+-----+------+----+----------+--------+

this is not working as expected. The old record is Invalidated. Notice however, that the new record is not written (opening a new fresh row starting from that date)

When re-executing for a 2nd time:

handle_merge('dummy_delta', d5, current_dt=5)

+---+-----+------+----+----------+--------+
|key|value|value2|date|is_current|valid_to|
+---+-----+------+----+----------+--------+
|  1|    4|     a|   1|     false|       5|
|  2|    5|     b|   1|      true|    null|
|  3|   67|     c|   5|      true|    null|
|  3|   66|     c|   3|     false|       5|
|  3|    6|     c|   1|     false|       2|
|  4|   44|     d|   4|      true|    null|
|  5|   55|     e|   5|      true|    null|
+---+-----+------+----+----------+--------+

the 67 appears. How can I fix the MERGE INTO conditions so the value appears straight away? Is there perhaps some additional MATCH condition required?

How can I specify a whenNotMatchedInsert for whenMatchedUpdate to insert the fresh row after invalidating the current?

geoHeil avatar Sep 03 '22 09:09 geoHeil

This feature seems neat! Unfortunately, we are already fully booked in our H2 roadmap, and since you've indicated you are unable to contribute a solution, there's not much we can do right now.

When we begin our 2023 roadmap planning, please bring up this issue again so that we can get the community's input on how desired it is!

Cheers.

scottsand-db avatar Sep 08 '22 19:09 scottsand-db

FYI I actually did all the work a couple of years ago and have a branch with this implemented for the Scala API only here: https://github.com/tripl-ai/delta

At the time the PR was rejected to this repo but if you are motivated the code could be updated for latest Delta (not by me).

seddonm1 avatar Sep 30 '22 22:09 seddonm1

I created a design doc to implement support for WHEN NOT MATCHED BY SOURCE clauses: [Design Doc] WHEN NOT MATCHED BY SOURCE. This enables selectively updating or deleting target rows that have no matches in the source table based on the merge condition.

API

A new whenNotMatchedBySource(condition) method is added to the Delta Table API, similar to the existing whenMatched(condition) and whenNotMatched(condition) methods. It returns a builder that allows specifying the action to apply using update(set) or delete(). whenNotMatchedBySource(condition) accepts an optional condition that needs to be satisfied for the corresponding action to be applied.

Usage example:

targetDeltaTable.as(“t”)
  .merge(sourceTable.as(“s”), condition = “t.key = s.key”)
  .whenMatched().updateAll()
  .whenNotMatched().insertAll()
  .whenNotMatchedBySource(condition = “t.value > 0”).update(set = “t.value = 0”)
  .whenNotMatchedBySource().delete()

This merge invocation will:

  • update all target rows that have a match in the source table using the source value.
  • Insert all source rows that have no match in the target into the target table.
  • Update all target rows that have no match in the source if t.value is strictly positive, otherwise delete the target row.

More details on the API and the implementation proposal can be found in the design doc. The SQL API will be shipped with Spark 3.4, see https://github.com/apache/spark/pull/38400.

Project Plan

Task Description Status PR
Delta API Scala Support Implement support for the clause in Delta using the Scala DeltaTable API. Done #1511
Delta API Python Support Implement support for the clause in Delta using the Python API. Done #1533
SQL Support After Spark 3.4 release / upgrading to Spark 3.4, make necessary changes to support the clause in SQL. Done https://github.com/delta-io/delta/pull/1740

johanl-db avatar Dec 06 '22 17:12 johanl-db

solved by https://github.com/delta-io/delta/pull/1511

geoHeil avatar Mar 29 '23 11:03 geoHeil

Just posting for visibility, from #1511: Support for WHEN NOT MATCHED BY SOURCE using SQL will be available with Spark 3.4 release and python support will follow up in a different PR.

scottsand-db avatar Apr 06 '23 18:04 scottsand-db

SQL support is in https://github.com/delta-io/delta/pull/1740

allisonport-db avatar May 24 '23 23:05 allisonport-db