ducklake icon indicating copy to clipboard operation
ducklake copied to clipboard

Merge Bugs - No Prevention Of Duplicates In Source - And Multi-Partition Problem (Based On Same Field)

Open HotfixAdamV2 opened this issue 2 months ago • 4 comments

What happens?

This is to highlight two separate bugs that I have found with the merge statement.

The first is that when partitioning on a TIMESTAMP field and using a combination of year, month, and day partitioning the merge statement will fail. This is similar to issue 503 https://github.com/duckdb/ducklake/issues/503.

The second is that the merge statement will not check for duplicates from the "source" when merging into the target. This results in multiple rows being inserted rather than the expected behavior of the transaction failing.

I've attached a zip file with all of the necessary pieces (including a requirements file and a docker-compose file to spin up a postgres image).

To Reproduce

import pandas as pd 
import duckdb 
import datetime as dt
import os

# Start connection and attach the ducklake catalog 
myduck = duckdb.connect() 

DATA_DIR = os.path.join(os.getcwd(), 'warehouse')

sql = f""" 
    -- Load Extensions
    INSTALL ducklake;
    LOAD ducklake;
    INSTALL postgres;
    LOAD postgres;
    
    -- Attach Database (w/ relative path to a directory within directory of this notebook)
    ATTACH 'ducklake:postgres:dbname=ducklake_catalog user=postgres password=postgres host=localhost port=5432' AS my_lake (
        DATA_PATH '{DATA_DIR}'
    );
    
    -- Set Default Context To DuckLake Catalog
    USE my_lake;
"""

myduck.execute(sql)

# Create schema and two tables to highlight the different issues 
create_sql = """ 
    -- Schema for both
    CREATE SCHEMA IF NOT EXISTS testing;

    -- Table for hidden partitioning issue 
    CREATE TABLE IF NOT EXISTS testing.transactions (
        TRANSACTION_TIME TIMESTAMP NOT NULL,
        TRANSACTION_ID BIGINT NOT NULL,
        TYPE VARCHAR NOT NULL,
        AMOUNT DOUBLE NOT NULL,
        ACC_ORIG VARCHAR NOT NULL,
        ACC_DEST VARCHAR NOT NULL
    );
    
    -- Setup partition for the hidden partitioning issue
    ALTER TABLE testing.transactions SET PARTITIONED BY (
        year(TRANSACTION_TIME),
        month(TRANSACTION_TIME),
        day(TRANSACTION_TIME)  
    );
    
    -- Table for merge issue (dupes)
    CREATE TABLE IF NOT EXISTS testing.transactions_two (
        TRANSACTION_TIME TIMESTAMP NOT NULL,
        TRANSACTION_ID BIGINT NOT NULL,
        TYPE VARCHAR NOT NULL,
        AMOUNT DOUBLE NOT NULL,
        ACC_ORIG VARCHAR NOT NULL,
        ACC_DEST VARCHAR NOT NULL,
        TRANSACTION_DATE DATE NOT NULL
    );
    
    -- setup partition for merge issue (dupes)
    ALTER TABLE testing.transactions_two SET PARTITIONED BY (TRANSACTION_DATE);
"""

myduck.execute(create_sql)

# Alright lets setup the sample data we are going to use... 
dt_now = dt.datetime.now()

data = [
    [dt_now, 1, 'payment', 100.0, 'A123', 'B456'],
    [dt_now + dt.timedelta(days=1), 2, 'payment', 100.0, 'A123', 'B456'],
    [dt_now + dt.timedelta(days=2), 3, 'payment', 100.0, 'A123', 'B456'],
    [dt_now + dt.timedelta(days=3), 4, 'payment', 100.0, 'A123', 'B456'],
    [dt_now + dt.timedelta(days=4), 5, 'payment', 100.0, 'A123', 'B456'],
    [dt_now + dt.timedelta(days=5), 6, 'payment', 100.0, 'A123', 'B456'],
]
columns = [
    'TRANSACTION_TIME', 
    'TRANSACTION_ID', 
    'TYPE', 
    'AMOUNT', 
    'ACC_ORIG', 
    'ACC_DEST',
]
df_data = pd.DataFrame(data = data, columns = columns)
df_data['TRANSACTION_DATE'] = df_data['TRANSACTION_TIME'].dt.date

# Now lets turn this into a duckdb relation so we can easily write it to our tables
expr_sql = """ 
    SELECT
        TRANSACTION_TIME::TIMESTAMP AS TRANSACTION_TIME,
        TRANSACTION_ID::BIGINT AS TRANSACTION_ID,
        TYPE::VARCHAR AS TYPE,
        AMOUNT::DOUBLE AS AMOUNT,
        ACC_ORIG::VARCHAR AS ACC_ORIG,
        ACC_DEST::VARCHAR AS ACC_DEST,
        TRANSACTION_DATE::DATE AS TRANSACTION_DATE
    FROM df_data
"""
expr = myduck.sql(expr_sql)

# Alright now we are going to highlight the first issue, which is that with the hidden partinioning (with multiple levels)
# we will end up failing on the merge statement. It seems to interpreting the TRANSACTION_TIME column as a integer rather than a timestamp
# when it gets to the "month" and "day" pieces of the partition 
merge_sql = """ 
    MERGE INTO testing.transactions AS target
    USING expr AS source
    ON target.TRANSACTION_ID = source.TRANSACTION_ID
    WHEN MATCHED THEN
        UPDATE SET
            TRANSACTION_TIME = source.TRANSACTION_TIME,
            TYPE = source.TYPE,
            AMOUNT = source.AMOUNT,
            ACC_ORIG = source.ACC_ORIG,
            ACC_DEST = source.ACC_DEST
    WHEN NOT MATCHED THEN
        INSERT (TRANSACTION_TIME, TRANSACTION_ID, TYPE, AMOUNT, ACC_ORIG, ACC_DEST) 
        VALUES (source.TRANSACTION_TIME, source.TRANSACTION_ID, source.TYPE, source.AMOUNT, source.ACC_ORIG, source.ACC_DEST);
"""

myduck.execute(merge_sql)

This is first failure


BinderException Traceback (most recent call last) Cell In[16], line 20 1 # Alright now we are going to highlight the first issue, which is that with the hidden partinioning (with multiple levels) 2 # we will end up failing on the merge statement. It seems to interpreting the TRANSACTION_TIME column as a integer rather than a timestamp 3 # when it gets to the "month" and "day" pieces of the partition 4 merge_sql = """ 5 MERGE INTO testing.transactions AS target 6 USING expr AS source (...) 17 VALUES (source.TRANSACTION_TIME, source.TRANSACTION_ID, source.TYPE, source.AMOUNT, source.ACC_ORIG, source.ACC_DEST); 18 """ ---> 20 myduck.execute(merge_sql)

BinderException: Binder Error: No function matches the given name and argument types 'month(BIGINT)'. You might need to add explicit type casts. Candidate functions: month(DATE) -> BIGINT month(INTERVAL) -> BIGINT month(TIMESTAMP) -> BIGINT month(TIMESTAMP WITH TIME ZONE) -> BIGINT

NOW BACK TO CODE FOR SECOND ISSUE

# Now we are going to highlight the second issue, which is that the merge statement does not check for duplicates within the source
merge_sql_dupes = """
    MERGE INTO testing.transactions_two AS target
    USING (
        SELECT * FROM expr 
        UNION ALL 
        SELECT * FROM expr 
    ) as source
    ON target.TRANSACTION_ID = source.TRANSACTION_ID
    WHEN MATCHED THEN
        UPDATE SET
            TRANSACTION_TIME = source.TRANSACTION_TIME,
            TYPE = source.TYPE,
            AMOUNT = source.AMOUNT,
            ACC_ORIG = source.ACC_ORIG,
            ACC_DEST = source.ACC_DEST,
            TRANSACTION_DATE = source.TRANSACTION_DATE
    WHEN NOT MATCHED THEN
        INSERT (TRANSACTION_TIME, TRANSACTION_ID, TYPE, AMOUNT, ACC_ORIG, ACC_DEST, TRANSACTION_DATE) 
        VALUES (source.TRANSACTION_TIME, source.TRANSACTION_ID, source.TYPE, source.AMOUNT, source.ACC_ORIG, source.ACC_DEST, source.TRANSACTION_DATE);
"""

myduck.execute(merge_sql_dupes)

# Now select from transactions_two table to show the duplicates
myduck.sql("SELECT * FROM testing.transactions_two")
TRANSACTION_TIME TRANSACTION_ID TYPE AMOUNT ACC_ORIG ACC_DEST TRANSACTION_DATE
2025-10-19 09:33:33.930532 5 payment 100 A123 B456 2025-10-19 00:00:00
2025-10-19 09:33:33.930532 5 payment 100 A123 B456 2025-10-19 00:00:00
2025-10-16 09:33:33.930532 2 payment 100 A123 B456 2025-10-16 00:00:00
2025-10-16 09:33:33.930532 2 payment 100 A123 B456 2025-10-16 00:00:00
2025-10-15 09:33:33.930532 1 payment 100 A123 B456 2025-10-15 00:00:00
2025-10-15 09:33:33.930532 1 payment 100 A123 B456 2025-10-15 00:00:00
2025-10-18 09:33:33.930532 4 payment 100 A123 B456 2025-10-18 00:00:00
2025-10-18 09:33:33.930532 4 payment 100 A123 B456 2025-10-18 00:00:00
2025-10-17 09:33:33.930532 3 payment 100 A123 B456 2025-10-17 00:00:00
2025-10-17 09:33:33.930532 3 payment 100 A123 B456 2025-10-17 00:00:00
2025-10-20 09:33:33.930532 6 payment 100 A123 B456 2025-10-20 00:00:00
2025-10-20 09:33:33.930532 6 payment 100 A123 B456 2025-10-20 00:00:00

duckdb_bug.zip

OS:

arm64

DuckDB Version:

1.4.1

DuckLake Version:

f134ad8

DuckDB Client:

Python

Hardware:

No response

Full Name:

Adam Mitchell

Affiliation:

Self

What is the latest build you tested with? If possible, we recommend testing with the latest nightly build.

I have tested with a stable release

Did you include all relevant data sets for reproducing the issue?

Yes

Did you include all code required to reproduce the issue?

  • [x] Yes, I have

Did you include all relevant configuration (e.g., CPU architecture, Python version, Linux distribution) to reproduce the issue?

  • [x] Yes, I have

HotfixAdamV2 avatar Oct 15 '25 13:10 HotfixAdamV2

Hi @HotfixAdamV2 !

  • First issue seems legit, there's something funky going on with type casting when using generated columns on partitions.
  • Issue two is not really an issue. DuckLake doesn't have enforced primary keys (as do many other lakehouse formats). If you have duplicates on your source data, you should deduplicate those before merging into. In this case it should be pretty straight forward since you only want to keep the latest transaction time for a given transaction id.

guillesd avatar Oct 20 '25 13:10 guillesd

@guillesd agree on issue two that it's not necessarily a bug. Other formats such as iceberg (and spark as the engine) will actually catch this and error out. Would it be possible to add this as a feature request (I.E. like an optional setting)?

HotfixAdamV2 avatar Oct 24 '25 19:10 HotfixAdamV2

AFAIK https://iceberg.apache.org/docs/latest/spark-ddl/ spark on Iceberg does not support primary key constraints. Maybe you can point me to the right docs?

guillesd avatar Oct 27 '25 08:10 guillesd

@guillesd apologies my response was unclear. Yes iceberg on spark does not support primary key constraints. However, when you run a merge statement spark will pre-validate the "view / table" you are using as the "source" for duplicate records on the columns used in the "ON" statement.

HotfixAdamV2 avatar Oct 27 '25 12:10 HotfixAdamV2