dlt icon indicating copy to clipboard operation
dlt copied to clipboard

Add `upsert` merge strategy

Open jorritsandbrink opened this issue 1 year ago • 8 comments

Description

This started as building merge support for Athena Iceberg, but the SQL logic runs on some other engines out-of-the-box too (e.g. postgres, mssql, snowflake, databricks, bigquery). It primarily uses the MERGE INTO statement to implement upsert (update + insert) logic. Since both delete-insert and scd2 don't apply, I introduced a new merge strategy called upsert.

Characteristics:

  • needs a primary_key
  • uses hash of primary_key as deterministic row identifier (_dlt_id) in root table
  • implements "true" upsert for root table (only updates and inserts), but still needs deletes for child tables (to delete records for elements which have been removed from an updated list)

Related Issues

  • Closes #633

Additional Context

jorritsandbrink avatar Apr 27 '24 21:04 jorritsandbrink

Deploy Preview for dlt-hub-docs ready!

Name Link
Latest commit a31fe62799f671c28355f2d1eea81ac9e43ace75
Latest deploy log https://app.netlify.com/sites/dlt-hub-docs/deploys/662d6fff585855000816ab80
Deploy Preview https://deploy-preview-1294--dlt-hub-docs.netlify.app
Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.

To edit notification comments on pull requests, go to your Netlify site configuration.

netlify[bot] avatar Apr 27 '24 21:04 netlify[bot]

@rudolfix @sh-rp Before continuing development on this branch, does the direction I've taken here make sense?

jorritsandbrink avatar Apr 28 '24 09:04 jorritsandbrink

@rudolfix

First addressing a main consideration here: I don't think upsert can (and should) be compatible with delete-insert. upsert needs a primary_key to ensure a one-to-one relationship between records in the source and target, while delete-insert can go without primary_key.

Then addressing your points in order:

  1. The normalizer is changed because the upsert logic relies on a deterministic _dlt_id (hash of primary_key). I think these hashes could also be calculated on the fly in the SQL engine, but that would be inefficient.
  2. Biggest benefit will be for the root table indeed. Note though that the delete-insert logic I have here for upsert behaves different than the delete-insert logic for delete-insert. Difference is that the former is more specific—it only deletes/inserts records for removed/added elements in the list while the latter deletes/inserts all records for all elements, even if they didn't change.
    • merge_key might not really apply here. The composite of primary_key and merge_key would still need to be unique to ensure the one-to-one relationship, in which case it would make more sense to just include all columns in primary_key rather than splitting them across primary_key and merge_key.
    • hard_delete should be possible.
  3. Two problems with Athena for delete-insert: doesn't support temp tables and doesn't support transactions.
    • We could probably still do delete-insert and use CTEs instead of temp tables, but that would be inefficient (but not more inefficient than the Databricks implementation that uses temporary views). Using an actual table is another approach, but would not have the same performance as a temp table.

Two options:

  1. Just make delete-insert work for Athena Iceberg and accept the drawbacks (no transactions, no temp tables).
  2. Continue upsert and make it complete/solid. This goes beyond just Athena Iceberg and can be beneficial for more destinations.

jorritsandbrink avatar Apr 29 '24 10:04 jorritsandbrink

@jorritsandbrink Athena has some kind of concept of temp tables when you use the WITH statement. I'm not sure wether that is helpful in your case, but that is what I found when looking at your pr this morning and doing some research.

sh-rp avatar Apr 29 '24 10:04 sh-rp

Also why do we need a primary key? The merge into has this ON clause which should support AND and OR conditions for merging on composite keys. Or am I mistaken here?

Maybe we can make the athena merge work first like the other destinations without a new merge strategy? I am not sure why I did not do that, there was some road block, but it may have been that I did not fully understand our merges at the time.

sh-rp avatar Apr 29 '24 11:04 sh-rp

@sh-rp We already discussed this on Slack, but I'll add it here too for transparency and completeness.

Also why do we need a primary key? The merge into has this ON clause which should support AND and OR conditions for merging on composite keys. Or am I mistaken here?

We need primary_key to ensure a unique key for the ON clause in MERGE INTO. SQL engines throw a syntax error when trying to update a target table with duplicate keys. For example, mssql says:

"The MERGE statement attempted to UPDATE or DELETE the same row more than once. This happens when a target row matches more than one source row. A MERGE statement cannot UPDATE/DELETE the same row of the target table multiple times. Refine the ON clause to ensure a target row matches at most one source row, or use the GROUP BY clause to group the source rows."

postgres throws a similar error.

Consider this case to understand why these errors make sense:

staging table:

id att
1 bar
1 baz

destination table:

id att
1 foo

What should be the result of the destination table after upserting the staging table into it? It's not properly defined.

Athena has some kind of concept of temp tables when you use the WITH statement. I'm not sure wether that is helpful in your case, but that is what I found when looking at your pr this morning and doing some research.

Yes, these are the CTEs I mention in my previous comment under (4). We can definitely use that to make it work, but it would imply recalculation for each table, rather than calculating once and reusing results as is the case with a temp table. As such, it won't be efficient.

jorritsandbrink avatar Apr 30 '24 13:04 jorritsandbrink

@jorritsandbrink thx for the explanation. I reviewed the code and now I understand your concept better. What I think we should do (@sh-rp do you agree):

  1. implement athena using insert-delete. I'd just create a table with a random name and then drop it (best effort). we can also create a view (but I doubt those are cached in any way). on databricks we use temporary views and MERGE statements to delete data. take a look
  2. stash the existing upsert code, we have a ticket for it #1129 where we can write a good spec

now regarding the upsert itself:

  • OK I get why merge key is not possible
  • you still use temp table in your code to optimize INSERT/MERGE into child tables. so it will not work for Athena
  • I get why we have primary key based hash! we had it in very early version of dlt! I think we could let people use such hash not only for upsert ie, by adding another hint that would provoke such hash
  • for the parent table MERGE statement I'd use primary key as it is. why: because arrow tables are not processed in the relational normalizer and will not work with upsert (we had the same problem with scd2 and it is temporarily solved which is not convenient).

rudolfix avatar May 01 '24 19:05 rudolfix

@rudolfix Okay, I will leave this branch and PR open as input for #1129 and create a new PR that simply implements delete-insert for Athena Iceberg.

jorritsandbrink avatar May 02 '24 13:05 jorritsandbrink

Closed PR. This has moved to https://github.com/dlt-hub/dlt/pull/1466.

jorritsandbrink avatar Jul 07 '24 09:07 jorritsandbrink