dlt icon indicating copy to clipboard operation
dlt copied to clipboard

Sql database extraction not working when using backend=`pyarrow` and `scd2`

Open lfpll opened this issue 3 months ago • 3 comments

dlt version

1..16

Describe the problem

Is not possible to load data from sql database using scd2

Expected behavior

Simply have data move from postgresql to bigquery.

Steps to reproduce

Create a mock database:

    # Start PostgreSQL container
    subprocess.run(
        [
            "docker",
            "run",
            "-d",
            "--name",
            container_name,
            "-e",
            "POSTGRES_DB=test_db",
            "-e",
            "POSTGRES_USER=test_user",
            "-e",
            "POSTGRES_PASSWORD=test_password",
            "-p",
            "5433:5432",
            "-v",
            f"{init_sql_path}:/docker-entrypoint-initdb.d/init.sql",
            "postgres:15",
        ],

init.sql

create schema test_schema;
-- Simple SCD test table with only 4 columns
CREATE TABLE test_schema.scd_test_table (
    id SERIAL PRIMARY KEY,
    name VARCHAR(100) NOT NULL,
    email VARCHAR(255),
    updated_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP
);


-- Insert initial SCD test data
INSERT INTO test_schema.scd_test_table (name, email, updated_at) VALUES
('John Doe', '[email protected]', '2024-01-01 10:00:00+00'),
('Jane Smith', '[email protected]', '2024-01-01 11:00:00+00');

Ingest data to bigquery

import dlt
from dlt.sources.credentials import ConnectionStringCredentials
from dlt.sources.sql_database import sql_database, sql_table

credentials = ConnectionStringCredentials(
    "postgresql+psycopg2://test_user:test_password@localhost:5432/test_db"
)


pipeline = dlt.pipeline(
    pipeline_name="test_scd2_pipeline",
    destination="bigquery",
    dataset_name="test_dataset",
    progress="log",
)

database = sql_database(
    credentials=credentials,
    table_names=['scd_test_table'],
    schema="test_schema",
    include_views=True,
    backend="pyarrow",
)

    
table = database.resources['scd_test_table']
table.apply_hints(
   write_disposition={"disposition": "merge", "strategy": "scd2"},
   primary_key=["id"],
)

pipeline.run(database)


Operating system

Linux

Runtime environment

Local

Python version

3.11

dlt data source

sql_table and sql_database

dlt destination

No response

Other deployment details

bigquery

Additional information

The error

 extraction_dlt git:(feat/tests_dag_factory) ✗ uv run dlt_scd2_simple.py                                                                                                                                                                                              
----------------------------- Extract sql_database -----------------------------
Resources: 0/1 (0.0%) | Time: 0.00s | Rate: 0.00/s
Memory usage: 200.79 MB (49.10%) | CPU usage: 0.00%

----------------------------- Extract sql_database -----------------------------
Resources: 0/1 (0.0%) | Time: 0.01s | Rate: 0.00/s
scd_test_table: 4  | Time: 0.00s | Rate: 838860.80/s
Memory usage: 201.66 MB (49.10%) | CPU usage: 0.00%

----------------------------- Extract sql_database -----------------------------
Resources: 1/1 (100.0%) | Time: 0.03s | Rate: 35.37/s
scd_test_table: 4  | Time: 0.02s | Rate: 258.17/s
Memory usage: 204.20 MB (49.10%) | CPU usage: 0.00%

----------------------------- Extract sql_database -----------------------------
Resources: 0/1 (0.0%) | Time: 0.00s | Rate: 0.00/s
Memory usage: 204.32 MB (49.10%) | CPU usage: 0.00%

----------------------------- Extract sql_database -----------------------------
Resources: 0/1 (0.0%) | Time: 0.00s | Rate: 0.00/s
_dlt_pipeline_state: 1  | Time: 0.00s | Rate: 349525.33/s
Memory usage: 204.45 MB (49.00%) | CPU usage: 0.00%

----------------------------- Extract sql_database -----------------------------
Resources: 1/1 (100.0%) | Time: 0.01s | Rate: 68.93/s
_dlt_pipeline_state: 1  | Time: 0.01s | Rate: 81.63/s
Memory usage: 204.45 MB (49.00%) | CPU usage: 0.00%

----------------- Normalize sql_database in 1758132339.1547894 -----------------
Files: 0/2 (0.0%) | Time: 0.00s | Rate: 0.00/s
Memory usage: 204.70 MB (49.00%) | CPU usage: 0.00%

----------------- Normalize sql_database in 1758132339.1547894 -----------------
Files: 0/2 (0.0%) | Time: 0.00s | Rate: 0.00/s
Items: 0  | Time: 0.00s | Rate: 0.00/s
Memory usage: 204.70 MB (49.00%) | CPU usage: 0.00%

----------------- Normalize sql_database in 1758132339.1547894 -----------------
Files: 2/2 (100.0%) | Time: 0.01s | Rate: 276.51/s
Items: 5  | Time: 0.01s | Rate: 717.93/s
Memory usage: 204.82 MB (49.00%) | CPU usage: 0.00%

Traceback (most recent call last):
  File "/.venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 539, in normalize
    runner.run_pool(normalize_step.config, normalize_step)
  File "/.venv/lib/python3.11/site-packages/dlt/common/runners/pool_runner.py", line 91, in run_pool
    while _run_func():
          ^^^^^^^^^^^
  File "/.venv/lib/python3.11/site-packages/dlt/common/runners/pool_runner.py", line 84, in _run_func
    run_metrics = run_f.run(cast(TExecutor, pool))
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/.venv/lib/python3.11/site-packages/dlt/normalize/normalize.py", line 292, in run
    self.spool_schema_files(load_id, schema, schema_files)
  File "/.venv/lib/python3.11/site-packages/dlt/normalize/normalize.py", line 246, in spool_schema_files
    self.spool_files(load_id, schema.clone(update_normalizers=True), map_f, files)
  File "/.venv/lib/python3.11/site-packages/dlt/normalize/normalize.py", line 198, in spool_files
    verify_normalized_table(schema, table, self.config.destination_capabilities)
  File "/.venv/lib/python3.11/site-packages/dlt/normalize/validate.py", line 54, in verify_normalized_table
    raise UnboundColumnException(schema.name, table["name"], [column])
dlt.common.schema.exceptions.UnboundColumnException: In schema `sql_database`: The following columns in table `scd_test_table` did not receive any data during this load:
  - _dlt_id (marked as non-nullable and must have values)

This can happen if you specify columns manually, for example, using the `merge_key`, `primary_key` or `columns` argument but they do not exist in the data.


The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/homdlt_scd2_simple.py", line 32, in <module>
    pipeline.run(database)
  File "/.venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 227, in _wrap
    step_info = f(self, *args, **kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/.venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 276, in _wrap
    return f(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/.venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 737, in run
    self.normalize()
  File "/.venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 227, in _wrap
    step_info = f(self, *args, **kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/.venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 181, in _wrap
    rv = f(self, *args, **kwargs)
         ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/.venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 276, in _wrap
    return f(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/.venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 543, in normalize
    raise PipelineStepFailed(
dlt.pipeline.exceptions.PipelineStepFailed: Pipeline execution failed at `step=normalize` when processing package with `load_id=1758132339.1547894` with exception:

<class 'dlt.common.schema.exceptions.UnboundColumnException'>
In schema `sql_database`: The following columns in table `scd_test_table` did not receive any data during this load:
  - _dlt_id (marked as non-nullable and must have values)

This can happen if you specify columns manually, for example, using the `merge_key`, `primary_key` or `columns` argument but they do not exist in the data.

lfpll avatar Sep 17 '25 16:09 lfpll

Luiz is asking for pointers to start solving it in community slack https://dlthub-community.slack.com/archives/C05CJEDHG2F/p1758268282688359

adrianbr avatar Sep 19 '25 07:09 adrianbr

right now row hash must be added manually to arrow tables as described below: https://dlthub.com/docs/general-usage/merge-loading#-use-scd2-with-arrow-tables-and-panda-frames

we should detect that and start warning the user right away in extract phase if strategy is scd2 and data is arrow/pandas. Alternatively we can add conditional warning to existing message in normalize step

rudolfix avatar Sep 19 '25 08:09 rudolfix

@rudolfix A thanks I didn't saw that. I had a problem because this was a bit hidden on the logs.

So if I understood correctly for all scd arrows this doesn't work is there a reason why this is not added directly when scd? I imagine you serialize to arrowmost of the connections unless there is a arrow direct connector, correct?

lfpll avatar Sep 22 '25 11:09 lfpll