dbt-core icon indicating copy to clipboard operation
dbt-core copied to clipboard

[CT-137] [Bug] Snapshot job creates duplicate valid records for unique_key

Open rdeese opened this issue 3 years ago • 13 comments

Is there an existing issue for this?

  • [X] I have searched the existing issues

Current Behavior

One of my running snapshots is amassing multiple valid records per unique_key. The competing valid records are produced regardless of whether a value in one of the check_cols has changed. Read on for details.

I have this snapshot:

{% snapshot billing_totals_snapshot %}

{{
  config(
    tags="billing",
    unique_key="org_id || '-' || billed_month",
    strategy='check',
    check_cols=[
      'source',
      'normalized_billing_type',
      'total_foo_charges',
      'total_bar_charges',
      'total_baz_charges',
      'total_other_charges',
      'total_charges'
    ]
  )
}}

SELECT * FROM {{ ref('billing_totals') }}

{% endsnapshot %}

The underlying table, billing_totals, is unique for the unique key. I wrote a query to check whether any unique keys have more than one valid record in the snapshot table:

	select 
		org_id,
		billed_month,
		count(1) as competing_valid_records
	from dbt_snapshots.billing_totals_snapshot
	where dbt_valid_to is null
	group by 1, 2
	having competing_valid_records > 1

Running it, I get many results (336,755 to be precise). I would expect zero.

My first guess was that the snapshot was recording real changes to the data, but not setting dbt_valid_to on the invalidated records. So I wrote a query to group competing valid records by the combined values of their check columns.

with non_unique_records as (
	select 
		org_id,
		billed_month,
		count(1) as competing_valid_records
	from dbt_snapshots.billing_totals_snapshot
	where dbt_valid_to is null
	group by 1, 2
	having competing_valid_records > 1
)
select 
	non_unique_records.org_id,
	non_unique_records.billed_month,
	non_unique_records.competing_valid_records,
	(snap."source" || '-' ||
	snap.normalized_billing_type || '-' ||
	snap.total_foo_charges || '-' ||
	snap.total_bar_charges || '-' ||
	snap.total_baz_charges || '-' ||
	snap.total_other_charges || '-' ||
	snap.total_charges) as snapshot_check_cols,
	count(1) as identical_valid_records
from non_unique_records
left join dbt_snapshots.billing_totals_snapshot as snap
	on non_unique_records.org_id = snap.org_id
	and non_unique_records.billed_month = snap.billed_month
where snap.dbt_valid_to is null
group by 1, 2, 3, 4

If every competing record represented an actual change, we should get 1 for identical_valid_records in all cases. I found that in most cases (330K out of 336K), identical_valid_records = competing_valid_records, i.e. all of the competing valid records are identical in the check columns.

Another thing that I find perplexing about this is that the number of competing valid records is not the same for all affected records. For a snapshot that has been run ~50 times (twice daily), the largest numbers of competing valid records are 21, 20, and 2, but 3-19 are also represented.

Expected Behavior

I expect that a snapshot table will have a single valid (i.e. dbt_valid_to is NULL) record for each unique_key.

Steps To Reproduce

Unfortunately I don't have steps to reproduce this, yet.

Relevant log output

No response

Environment

- OS: Docker, python:3.7.9-slim image
- Python: 3.7.9
- dbt: 0.21.0

Link to python:3.7.9-slim image.

What database are you using dbt with?

redshift

Additional Context

I have other snapshots running that are unaffected by this issue.

There is a previous issue that appears to be similar to this one, #2607 . I decided to make a new issue because that one is fairly stale (last substantive discussion was in 2020), has a confusing history (the ticket at first claimed the opposite problem, but was later renamed), and a lot of the discussion involves post-hooks which don't apply to my case.

Hopefully this new issue is an opportunity to have a more focused discussion of the problem -- but I'm happy to move conversation over to that ticket if the maintainers prefer.

rdeese avatar Feb 02 '22 08:02 rdeese

Do you have any automated tests to catch this? We've added some tests on some of our snapshots just to catch odd behavior or weirdness in our source data or human error. Something like this:

      - dbt_utils.unique_combination_of_columns:
          combination_of_columns:
            - account_id
            - dbt_valid_from
      - dbt_utils.mutually_exclusive_ranges:
          lower_bound_column: dbt_valid_from
          upper_bound_column: dbt_valid_to
          partition_by: account_id
          gaps: not_allowed

We also add an is_current flag on a layer of models that we have on top of our snapshots (we call them "filters") and the logic for is_current is this:

SUBQUERY AS (
    SELECT
     *,
        CAST(COALESCE(dbt_valid_to, {{ var('future_eternity_ts') }}) AS TIMESTAMP) AS dbt_valid_to
        FROM {{ref('my_snapshot')}} 
    )
SELECT
    *,
    (
        CASE WHEN
            CURRENT_TIMESTAMP() BETWEEN dbt_valid_from AND dbt_valid_to
            THEN TRUE
            ELSE FALSE
        END
    ) AS is_current
    FROM SUBQUERY

Then we have this test on top of the filter to make sure that every entry has one and only one is_current row:

      - name: is_current
        tests:
          - not_null
          - count_value_by_id:
              id: vendor_id
              value: TRUE
              operand: "="
              count: 1

And then this is the macro for our custom test (there's probably something better now in dbt_expectations, we wrote this like 18 months ago:

/*
Custom schema test that checks a column to test for the count of a particular value.
Example usage:
count_value:
  id: id
  value: NULL
  operand: <
  count: 25
The test will pass if the count of NULL values is less than 25 for any given id, and will fail if the count of NULL values are greater than or equal to 25.
*/

{% macro test_count_value_by_id(model, column_name, id, value, operand, count) %}

with all_ids_values as (

    select
        {{ id }} as id,
        {% if value|lower == "null" -%}
        if({{ column_name }} is null,TRUE,FALSE) as field_value
        {%- else -%}
        if({{ column_name }} = {{ value }},TRUE,FALSE) as field_value
        {%- endif %}
        from {{ model }}

), distinct_ids as (

    select distinct
        id
        from all_ids_values

), ids_with_value as (

    select
        id,
        count(*) as count
        from all_ids_values
        where field_value = true
        group by id

), all_ids_with_counts as (

    select 
        a.id,
        coalesce(count,0) as count 
        from distinct_ids a
        left join ids_with_value b
            on a.id = b.id

)
select 
    *
    from all_ids_with_counts 
    where not count {{ operand }} {{ count }}

{% endmacro %}

codigo-ergo-sum avatar Feb 09 '22 01:02 codigo-ergo-sum

I know the above won't solve your problem but maybe it helps to sleep a little bit better knowing that you'll be immediately warned if the problem does happen again...

codigo-ergo-sum avatar Feb 09 '22 01:02 codigo-ergo-sum

Thanks for sharing your approach. We don't currently have anything like this in place, but I think we'll certainly add something using your tests as a jumping off point.

rdeese avatar Feb 09 '22 01:02 rdeese

The way I've patched this up in the meantime is to create a shim table which imputes what dbt_valid_to should be by:

  1. ranking rows according to dbt_valid_from
  2. joining consecutive ranks
  3. using the subsequent rank's dbt_valid_from as the synthetic_dbt_valid_to for each rank.

The SQL is below. I've validated the code on a properly functioning snapshot and it gives an identical result to the correct DBT behavior.

with
    ranked_entries as (
        select
            org_id,
            billed_month,
            dbt_valid_from,
            rank() over (
                partition by org_id, billed_month order by dbt_valid_from
            ) as valid_from_rank,
            dbt_valid_to
        from {{ ref('billing_totals_snapshot') }}
    ),
    synthentic_valid_to as (
        select
            ranked_entries.org_id,
            ranked_entries.billed_month,
            ranked_entries.dbt_valid_from,
            next_entries.dbt_valid_from as synthetic_dbt_valid_to
        from ranked_entries
        left join
            ranked_entries as next_entries
            on ranked_entries.org_id = next_entries.org_id
            and ranked_entries.billed_month = next_entries.billed_month
            and ranked_entries.valid_from_rank + 1 = next_entries.valid_from_rank
    )
select
    billing_totals_snapshot.dbt_valid_from,
    synthentic_valid_to.synthetic_dbt_valid_to,
    billing_totals_snapshot.org_id,
    billing_totals_snapshot.billed_month,
    ...
from {{ ref('billing_totals_snapshot') }}
inner join
    synthentic_valid_to
    on billing_totals_snapshot.org_id = synthentic_valid_to.org_id
    and billing_totals_snapshot.billed_month = synthentic_valid_to.billed_month
    and billing_totals_snapshot.dbt_valid_from = synthentic_valid_to.dbt_valid_from

rdeese avatar Feb 09 '22 03:02 rdeese

Hey @rdeese, thanks for opening this. If this is a critical part in the data pipeline, we highly recommend trying to get the real event stream from some kind of change data capture system, since that's going to be the most accurate. But if that's not the case, a few area that you can look into:

  • is there multiple snapshot running at the same time
  • is the unique key in the table truly unique all the time Since under the hood, dbt is trying to compare the difference of you table between current snapshot run and previous snapshot run. There might also be missing states between snapshots based on how frequent you run the snapshot.

ChenyuLInx avatar Feb 14 '22 19:02 ChenyuLInx

I am having the same issue, it is occurring almost daily that some (not all) of the snaphosts contain duplicated rows per unique key for a valid record (i.e. dbt_valid_to is NULL).

  • there is only one simultaneous snapshot run
  • the unique key in the table is truly unique and the valid record is a true duplicate (all fields in the record are exactly the same, including dbt_scd_id)
  • it is happening almost every day

We are also using the check_col strategy.

alicefinidori avatar Apr 07 '22 05:04 alicefinidori

This issue has been marked as Stale because it has been open for 180 days with no activity. If you would like the issue to remain open, please remove the stale label or comment on the issue, or it will be closed in 7 days.

github-actions[bot] avatar Oct 05 '22 02:10 github-actions[bot]

  • OS: Windows 10 Pro 19043.2130
  • Python:3.8
  • dbt:1.0.1

I ran into this as well, and it was happening for me when I would have two merge statements that tried to run at the same time. And the sinister part is that for me the duplicate record bug didn't occur until at least 2 snapshots past the async run. Reproducibility steps:

1. Table created with one value:

create table sandbox.public.table1(pk int,new_value int);
insert into sandbox.public.table1 values(1,1001);

2. Snapshotting code: DBT_TEST_SNAP.sql

{% snapshot DBT_TEST_SNAP%}

{{
    config(
      unique_key='PK',
      strategy='check',
      check_cols = 'all',
      invalidate_hard_deletes=True
    )
}}

select * from SANDBOX.PUBLIC.TABLE1

{% endsnapshot %}

3. Update value: update sandbox.public.table1 set new_value = 1002 where pk = 1;

4. Run snapshot twice at the same time: I did this in powershell...dealers choice. resulting table. Two active records created, near-duplicates: After_async_snap.csv

5. Update Record again: update sandbox.public.table1 set new_value = 1003 where pk = 1; This closes both of the existing records and creates two exact duplicates. After_Update.csv

6. Update Record again: update sandbox.public.table1 set new_value = 1004 where pk = 1;

20:06:25  1 of 1 START snapshot ....DBT_TEST_SNAP........................... [RUN]
20:06:30  1 of 1 ERROR snapshotting ....DBT_TEST_SNAP....................... [ERROR in 5.77s]        
20:06:30
20:06:30  Finished running 1 snapshot in 13.47s.
20:06:30
20:06:30  Completed with 1 error and 0 warnings:
20:06:30
20:06:30  Database Error in snapshot DBT_TEST_SNAP (snapshots\DBT_TEST_SNAP.sql)
20:06:30    100090 (42P18): Duplicate row detected during DML action
20:06:30    Row Values: [1, 1003, "a98b516393854d620735c2e1b9e1a8dc", 1666037081091000000, 1666037081091000000, NULL]
20:06:30    compiled SQL at target\run\groups_dbt\snapshots\DBT_TEST_SNAP.sql
20:06:30
20:06:30  Done. PASS=0 WARN=0 ERROR=1 SKIP=0 TOTAL=1

When they don't run at the same time, everything is fine, but ideally there'd be some parameterizable logic to put a lock on a table when a snapshot is already running.

kjstultz avatar Oct 17 '22 21:10 kjstultz

Digging into the snowflake documentation a bit, it sounds like they should be blocking these simultaneous merge statements from occurring by having the table locked by a transaction.

But it does sound like if they are using the same connection, they've seen some wonky behavior. The session ID's of the two simultaneous merge statements were different, but it's a bit unclear to me whether or not this would make a difference and if these transactions could bump into each other.

One thought I had for testing the theory is by naming the "begin" transaction block when the snowflake_dml_explicit_transaction macro is called from the snowflake adapter.

If the transactions are named, I imagine it should eliminate any transaction-melding that might be happening, if that is what's causing the issue. But at this point, it's just a theory.

kjstultz avatar Oct 18 '22 16:10 kjstultz

Faced the same original issue. It seems dbt snapshotting doesn't read data types consistently. Therefore, the issue can be resolved by specifically casting all columns to what they are. (e.g., all numeric columns to be casted to numeric. Snapshotting messes it up sometimes by interchangeably reading as float and numeric.) Make sure to cast when you read from the source.

razmik avatar Mar 10 '23 21:03 razmik

  • dbt cloud 1.3
  • snowflake

We are encountering this issue as well, although I am only seeing duplicate unique records with exactly 2 null values for dbt_valid_to.

Having looked through debug logs, I believe this is the result of 2 colliding jobs running snapshot commands for the same snapshot at (nearly) the precise same time. I'm now trying to figure out a way to avoid these collisions without having to manually adjust schedules or --exclude flags on all of our prod jobs.

aaronleming avatar Jun 01 '23 17:06 aaronleming

Another user running into this issue where Duplicate records are created when the snapshot job has 2 colliding runs. Please find teh details and debug logs of both teh runs below:

Sample data, where unique_key='cv_transaction_sid', check_cols=['md_version_hash'] Duplicate_snapshot_records.csv


 

CV_TRANSACTION_SID | MD_VERSION_HASH | DBT_SCD_ID | DBT_UPDATED_AT | DBT_VALID_FROM | DBT_VALID_TO | MD_ADF_LOG_ENTRY_TIMESTAMP | MD_LOG_ID | MD_STAGE_LOG_ID -- | -- | -- | -- | -- | -- | -- | -- | -- 368af339ace909b00d6a7fdeaed17017 | 8025b86b4de9237c32cdb807252b87f9 | 0621901c6d6ff749b883732d994d077c | 2023-10-19 07:11:19.905 | 2023-10-19 07:11:19.905 | 2023-10-25 03:35:38.917 | 2023-10-19 08:18:55.472 +0200 | ce23820b-8f20-4015-bab9-0da6d0d7c09a | dbdb1655-f987-4a53-a864-e9f1d38d6583\|\|\|2023/10/19/03/00 368af339ace909b00d6a7fdeaed17017 | e905fc8af2fd3c424d1da79cf048eb1e | cc948d7ae2e177a306e2670baeb3538f | 2023-10-25 03:35:38.917 | 2023-10-25 03:35:38.917 | 2023-10-27 03:43:34.504 | 2023-10-25 05:16:04.732 +0200 | 39ff636b-0aa4-4fa6-8f73-891081ed44d6 | f754cc5a-4c7a-4f8c-b78d-df2ce9483d73\|\|\|2023/10/25/03/00 368af339ace909b00d6a7fdeaed17017 | e905fc8af2fd3c424d1da79cf048eb1e | 6662200c24fa4ae4f358fff1a684d380 | 2023-10-25 07:22:33.821 | 2023-10-25 07:22:33.821 | 2023-10-27 03:43:34.504 | 2023-10-25 05:16:04.732 +0200 | 92d4f6ee-2fde-42a2-91ed-5e52446b13c6 | f754cc5a-4c7a-4f8c-b78d-df2ce9483d73\|\|\|2023/10/25/03/00 368af339ace909b00d6a7fdeaed17017 | 3c67fa1acf619610b8183a635abb1513 | dbbd26c4f526ca742c44ffde7bcf763c | 2023-10-27 03:43:34.504 | 2023-10-27 03:43:34.504 |   | 2023-10-27 05:27:56.232 +0200 | 875e371e-153d-42a6-98d6-ef6e1f103643 | 43ea984f-9614-46b7-a8e4-65d2e50e2a29\|\|\|2023/10/27/03/00 368af339ace909b00d6a7fdeaed17017 | 3c67fa1acf619610b8183a635abb1513 | dbbd26c4f526ca742c44ffde7bcf763c | 2023-10-27 03:43:34.504 | 2023-10-27 03:43:34.504 |   | 2023-10-27 05:27:56.232 +0200 | 875e371e-153d-42a6-98d6-ef6e1f103643 | 43ea984f-9614-46b7-a8e4-65d2e50e2a29\|\|\|2023/10/27/03/00

 

We can see that row 1 is ok, rows 2 and 3 have the same MD_VERSION_HASH but different DBT_SCD_ID, while rows 4 and 5 have same MD_VERSION_HASH and same DBT_SCD_ID.

How it happened:

  1. At 2023-10-25 03:35:38.917, first create or replace temporary table "PROD"."PBW"."PBW_CV_TRANSACTION_VER__dbt_tmp" statement with query_id = 01afe037-0302-ef55-0000-815d22cc0912 was triggered.
  2. At 2023-10-25 07:14:54.863, first merge statement with query_id = 01afe112-0302-ee58-0000-815d22cfcaa2 was triggered.
  3. At 2023-10-25 07:22:33.821, second create or replace temporary table "PROD"."PBW"."PBW_CV_TRANSACTION_VER__dbt_tmp" statement with query_id = 01afe11a-0302-ee58-0000-815d22d00422 was triggered, while first merge was still running.
  4. At 2023-10-25 07:30:38.033, first merge statement finished, inserting row 2.
  5. At 2023-10-25 11:15:41.083, second merge is finished, inserting row 3.
  6. At 2023-10-27 03:43:34.504, third create or replace temporary table "PROD"."PBW"."PBW_CV_TRANSACTION_VER__dbt_tmp" statement produce duplicated rows 4 and 5.

Attached are the debug logs for colliding runs of the same job on dbt cloud. Deploy - Run #2035649 Deploy - Run #2037538 image (1) debug (50).log debug (51).log

gunnava avatar Nov 15 '23 14:11 gunnava

I also experienced this issue. Steps to replicate:

  1. Create a table with no duplicates on the unique key (which we'll call unique_id)
  2. Snapshot the table with check_cols='all'
  3. Manually add a new record to the table, which has two rows for a new unique_id
  4. Snapshot the table again.

What I have observed is that if a duplicate appears for a unique_id value that is already in the snapshot, the snapshot will error as expected. However, if a duplicate appears for a unique_id value which is NOT already in the snapshot, the snapshot will add the duplicate rows without error, and will error in all subsequent snapshots due to having non-unique values in the snapshot itself.

elsander avatar May 03 '24 15:05 elsander

I've also encountered this behavior occurring intermittently, and I suspect that it's caused by some sort of network hiccup in our environment (AWS Airflow, EKS, Snowflake). We have a number of tables using the check all cols approach since the source doesn't have timestamps we can use, and for now this is the only way we can get CDC. Both our dev and prod environments are pointing to the same source, but occasionally one environment will produce duplicates while the other does not.

I confirmed that Airflow only triggered the snapshots once, then got digging through the logs on Snowflake. What I eventually found was that the temp table was created for the merge only once - but the merge statement was executed twice for some reason, about 2 hours apart. As a workaround I'm thinking about creating a post hook for these snapshots to de-dupe them.

(Edit - and I should mention, we're not using dbt retry on our snapshot runs)

krspinney avatar Sep 18 '24 14:09 krspinney

  1. Run snapshot twice at the same time:

Given that we document that only one dbt command should be running at any given time - see here and here - I'm going to close this issue as "not planned"

graciegoheen avatar Oct 30 '24 17:10 graciegoheen

@graciegoheen This issue happens with only one dbt command running at a time. I've confirmed this in both our Airflow and Snowflake logs. If the command ran twice, I would see dbt's temp table being created twice and the merge statement being run twice. Instead what I see is this:

  1. Temp table created
  2. Merge
  3. Merge

This is the underlying cause of the behavior @elsander noted before me. Something is causing the merge statement to be triggered twice, which results in brand new rows being inserted twice in close succession.

The workaround I have for the moment is running a posthook on the largeish snapshots that seem to get affected by this - it's always snapshots that take multiple hours to run. It's not very efficient, but basically I run this after:

CREATE OR REPLACE TABLE {{ this }} AS SELECT DISTINCT * FROM {{ this }}

krspinney avatar Oct 30 '24 18:10 krspinney

I've confirmed this in both our Airflow and Snowflake logs.

Would you be able to provide a reproducible example? Are there specific circumstances / code that cause this to happen (besides the snapshot table being large)?

graciegoheen avatar Oct 30 '24 18:10 graciegoheen

Unfortunately not, I spent a fair bit of time trying to find a way to reproduce it but was unsuccessful. There does seem to be an element of randomness to it - eg: for a while this was happening to us in our dev environment but not in prod, despite both being identical in every way including the data being transformed.

krspinney avatar Oct 30 '24 19:10 krspinney

From my perspective, these snapshots/merge statements should be wrapped in a transaction if possible. If these are big queries, depending on configuration, you could have two different snapshot commands executing at the same time especially if it's long-running, and that's when this behavior has occurred for me.

If each command was wrapped in a transaction, we wouldn't end up with both queries performing async updates to the same table...I think, haven't tested.

My steps above still work for me to replicate this issue.

kjstultz avatar Oct 31 '24 11:10 kjstultz

The only thing I'll add here is that I was able to replicate this issue (though intermittently) with a small test table. So it may be easier to see this behavior with a larger table, but it's not strictly necessary.

On Thu, Oct 31, 2024, 6:02 AM kjstultz @.***> wrote:

From my perspective, these snapshots/merge statements should be wrapped in a transaction if possible. If these are big queries, depending on configuration, you could have two different snapshot commands executing at the same time especially if it's long-running, and that's when this behavior has occurred for me.

If each command was wrapped in a transaction, we wouldn't end up with both queries performing async updates to the same table...I think, haven't tested.

My steps above still work for me to replicate this issue.

— Reply to this email directly, view it on GitHub https://github.com/dbt-labs/dbt-core/issues/4661#issuecomment-2449585646, or unsubscribe https://github.com/notifications/unsubscribe-auth/ACWLVLFH6IV4MYIA277JPFTZ6IE6BAVCNFSM5NLKC4S2U5DIOJSWCZC7NNSXTN2JONZXKZKDN5WW2ZLOOQ5TENBUHE2TQNJWGQ3A . You are receiving this because you were mentioned.Message ID: @.***>

elsander avatar Oct 31 '24 12:10 elsander

Sharing our understanding of the problem and two solutions. Just one caveat - this thread has mixed up cases of individual and multiple snapshot invocations and also different underlying databases. We focus solely on parallel snapshosts that create duplicate entries in Snowflake.

Context

A snapshot comprises two individual statements: a CTAS statement followed by a MERGE. The MERGE uses data created in the temporary table by the CTAS to insert, delete and merge entries in the snapshot.

DBT internally constructs a column dbt_scd_id to track changes recorded by a snapshot. This column is used when the MERGE statement looks up data in the CTAS table and applies changes to the snapshot. By default, dbt_sci_id is calculated as a hash of a primary key and the snapshot's invocation timestamp.

Problem statement

Two parallel snapshots will always have slightly different start times and so their CTAS statements will create different dbt_scd_id values for the same primary kay values. Even though Snowflake MERGE statement generally guarantees conflict avoidance, because the MERGE in DBT snapshot uses dbt_scd_id that we know differs in-between invocations, Snowflake does not consider these statements conflicting and so both merges apply their changes*.

* To be precise, only the insertions get applied by both merges. Because the updates re-use already existing dbt_scd_id values that are independent of the snapshot's invocation time, they get applied only once, by the snapshot that commits first.

Potential but currently impossible solution

Having both statements happen within a single transaction and under an isolation level that would apply it serially would prevent the CTAS statement in the parallel snapshot that starts later from recording any changes and conflict altogether. Unfortunately, Snowflake supports READ COMMITTED only and so the CTAS statements in two parallel transactions will always see the snapshot table as before the other transaction's MERGE statement and work with it independently of the other one's changes.

Actual solution # 1, clean

Use https://docs.getdbt.com/reference/resource-configs/updated_at. This results in the snapshot not having to rely on the invocation timestamp to calculate dbt_scd_id but instead use values from this column. Some datasets have natural columns that make good candidates for this. Ones that don't, consider enriching them with something like ARRIVAL_TIMESTAMP/REPORT_DATE etc. When you have no influence on the incoming dataset, see the next solution.

Actual solution # 2, stopgap

DBT for Snowflake does not support controlling whether pre/post-hooks happen within the same or a separate transaction as the main query. However, one can overwrite the built-in transaction handling for snapshot MERGEs specifically by re-defining default__snapshot_merge_sql.sql as:

{% macro snowflake__snapshot_merge_sql(target, source, insert_cols) -%}
    {{ default__snapshot_merge_sql(target, source, insert_cols) }};
{% endmacro %}

With that, you can define the following pre and post hooks. According to your needs, do it at the project, schema or a snapshot level.

pre_hook: "ALTER SESSION SET AUTOCOMMIT = FALSE;
           ALTER SESSION SET TRANSACTION_ABORT_ON_ERROR = TRUE;
           SET SNAPSHOT_PK = 'YOUR_PRIMARY_KEY'"
post_hook: "{{ error_on_snapshot_conflict() }}; COMMIT"

where error_on_snapshot_conflict could look like this:

{% macro error_on_snapshot_conflict() -%}
SELECT 1/0
FROM {{ this }}
WHERE DBT_VALID_TO IS NULL
GROUP BY IDENTIFIER($SNAPSHOT_PK)
HAVING COUNT(*) > 1
{% endmacro %}

All in all, this means that a MERGE statement gets rolled back if it were to insert duplicate entries. Because it relies on transactions that Snowflake natively supports the rollback is instantaneous, the check is the type of query that's always quick in Snowflake, and so any cost added is negligible. The obvious downside is that of overwriting built-in DBT macro.

Note to the DBT team # 1 Can we expect support for controlling transactionality of pre/post-hooks in the Snowflake adapter? If not, would be useful to understand what is it in this platform that makes it particularly difficult to add.

Note to the DBT team # 2 Could you please revise the technical docs about parallel execution. The way they are phrased at the moment, they put executing in a single process (OS process presumably?) in the spotlight which is misleading - it's easy to incorrectly infer that executing the same node (e.g. snapshot) in parallel but from different processes (machines) is supported. https://docs.getdbt.com/reference/programmatic-invocations#parallel-execution-not-supported

Note to the DBT team # 3 Added this here as this issue seems to have the most traction on this topic and because we're guessing the Snowflake adapter is not the only one that exhibits this behaviour. Trusting you can moderate it such that it ends up in the place where it's most visible and useful to the users.

acl-oss avatar Feb 04 '25 14:02 acl-oss

I've implemented @acl-oss 's stopgap solution in our project (our data source lacks any sort of timestamp we could use for our updated_at unfortunately). This solves the problem very neatly and cleanly!

krspinney avatar Feb 06 '25 16:02 krspinney