Sql database extraction not working when using backend=`pyarrow` and `scd2`
dlt version
1..16
Describe the problem
Is not possible to load data from sql database using scd2
Expected behavior
Simply have data move from postgresql to bigquery.
Steps to reproduce
Create a mock database:
# Start PostgreSQL container
subprocess.run(
[
"docker",
"run",
"-d",
"--name",
container_name,
"-e",
"POSTGRES_DB=test_db",
"-e",
"POSTGRES_USER=test_user",
"-e",
"POSTGRES_PASSWORD=test_password",
"-p",
"5433:5432",
"-v",
f"{init_sql_path}:/docker-entrypoint-initdb.d/init.sql",
"postgres:15",
],
init.sql
create schema test_schema;
-- Simple SCD test table with only 4 columns
CREATE TABLE test_schema.scd_test_table (
id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
email VARCHAR(255),
updated_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP
);
-- Insert initial SCD test data
INSERT INTO test_schema.scd_test_table (name, email, updated_at) VALUES
('John Doe', '[email protected]', '2024-01-01 10:00:00+00'),
('Jane Smith', '[email protected]', '2024-01-01 11:00:00+00');
Ingest data to bigquery
import dlt
from dlt.sources.credentials import ConnectionStringCredentials
from dlt.sources.sql_database import sql_database, sql_table
credentials = ConnectionStringCredentials(
"postgresql+psycopg2://test_user:test_password@localhost:5432/test_db"
)
pipeline = dlt.pipeline(
pipeline_name="test_scd2_pipeline",
destination="bigquery",
dataset_name="test_dataset",
progress="log",
)
database = sql_database(
credentials=credentials,
table_names=['scd_test_table'],
schema="test_schema",
include_views=True,
backend="pyarrow",
)
table = database.resources['scd_test_table']
table.apply_hints(
write_disposition={"disposition": "merge", "strategy": "scd2"},
primary_key=["id"],
)
pipeline.run(database)
Operating system
Linux
Runtime environment
Local
Python version
3.11
dlt data source
sql_table and sql_database
dlt destination
No response
Other deployment details
bigquery
Additional information
The error
extraction_dlt git:(feat/tests_dag_factory) ✗ uv run dlt_scd2_simple.py
----------------------------- Extract sql_database -----------------------------
Resources: 0/1 (0.0%) | Time: 0.00s | Rate: 0.00/s
Memory usage: 200.79 MB (49.10%) | CPU usage: 0.00%
----------------------------- Extract sql_database -----------------------------
Resources: 0/1 (0.0%) | Time: 0.01s | Rate: 0.00/s
scd_test_table: 4 | Time: 0.00s | Rate: 838860.80/s
Memory usage: 201.66 MB (49.10%) | CPU usage: 0.00%
----------------------------- Extract sql_database -----------------------------
Resources: 1/1 (100.0%) | Time: 0.03s | Rate: 35.37/s
scd_test_table: 4 | Time: 0.02s | Rate: 258.17/s
Memory usage: 204.20 MB (49.10%) | CPU usage: 0.00%
----------------------------- Extract sql_database -----------------------------
Resources: 0/1 (0.0%) | Time: 0.00s | Rate: 0.00/s
Memory usage: 204.32 MB (49.10%) | CPU usage: 0.00%
----------------------------- Extract sql_database -----------------------------
Resources: 0/1 (0.0%) | Time: 0.00s | Rate: 0.00/s
_dlt_pipeline_state: 1 | Time: 0.00s | Rate: 349525.33/s
Memory usage: 204.45 MB (49.00%) | CPU usage: 0.00%
----------------------------- Extract sql_database -----------------------------
Resources: 1/1 (100.0%) | Time: 0.01s | Rate: 68.93/s
_dlt_pipeline_state: 1 | Time: 0.01s | Rate: 81.63/s
Memory usage: 204.45 MB (49.00%) | CPU usage: 0.00%
----------------- Normalize sql_database in 1758132339.1547894 -----------------
Files: 0/2 (0.0%) | Time: 0.00s | Rate: 0.00/s
Memory usage: 204.70 MB (49.00%) | CPU usage: 0.00%
----------------- Normalize sql_database in 1758132339.1547894 -----------------
Files: 0/2 (0.0%) | Time: 0.00s | Rate: 0.00/s
Items: 0 | Time: 0.00s | Rate: 0.00/s
Memory usage: 204.70 MB (49.00%) | CPU usage: 0.00%
----------------- Normalize sql_database in 1758132339.1547894 -----------------
Files: 2/2 (100.0%) | Time: 0.01s | Rate: 276.51/s
Items: 5 | Time: 0.01s | Rate: 717.93/s
Memory usage: 204.82 MB (49.00%) | CPU usage: 0.00%
Traceback (most recent call last):
File "/.venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 539, in normalize
runner.run_pool(normalize_step.config, normalize_step)
File "/.venv/lib/python3.11/site-packages/dlt/common/runners/pool_runner.py", line 91, in run_pool
while _run_func():
^^^^^^^^^^^
File "/.venv/lib/python3.11/site-packages/dlt/common/runners/pool_runner.py", line 84, in _run_func
run_metrics = run_f.run(cast(TExecutor, pool))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/.venv/lib/python3.11/site-packages/dlt/normalize/normalize.py", line 292, in run
self.spool_schema_files(load_id, schema, schema_files)
File "/.venv/lib/python3.11/site-packages/dlt/normalize/normalize.py", line 246, in spool_schema_files
self.spool_files(load_id, schema.clone(update_normalizers=True), map_f, files)
File "/.venv/lib/python3.11/site-packages/dlt/normalize/normalize.py", line 198, in spool_files
verify_normalized_table(schema, table, self.config.destination_capabilities)
File "/.venv/lib/python3.11/site-packages/dlt/normalize/validate.py", line 54, in verify_normalized_table
raise UnboundColumnException(schema.name, table["name"], [column])
dlt.common.schema.exceptions.UnboundColumnException: In schema `sql_database`: The following columns in table `scd_test_table` did not receive any data during this load:
- _dlt_id (marked as non-nullable and must have values)
This can happen if you specify columns manually, for example, using the `merge_key`, `primary_key` or `columns` argument but they do not exist in the data.
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/homdlt_scd2_simple.py", line 32, in <module>
pipeline.run(database)
File "/.venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 227, in _wrap
step_info = f(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/.venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 276, in _wrap
return f(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/.venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 737, in run
self.normalize()
File "/.venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 227, in _wrap
step_info = f(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/.venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 181, in _wrap
rv = f(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/.venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 276, in _wrap
return f(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/.venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 543, in normalize
raise PipelineStepFailed(
dlt.pipeline.exceptions.PipelineStepFailed: Pipeline execution failed at `step=normalize` when processing package with `load_id=1758132339.1547894` with exception:
<class 'dlt.common.schema.exceptions.UnboundColumnException'>
In schema `sql_database`: The following columns in table `scd_test_table` did not receive any data during this load:
- _dlt_id (marked as non-nullable and must have values)
This can happen if you specify columns manually, for example, using the `merge_key`, `primary_key` or `columns` argument but they do not exist in the data.
Luiz is asking for pointers to start solving it in community slack https://dlthub-community.slack.com/archives/C05CJEDHG2F/p1758268282688359
right now row hash must be added manually to arrow tables as described below: https://dlthub.com/docs/general-usage/merge-loading#-use-scd2-with-arrow-tables-and-panda-frames
we should detect that and start warning the user right away in extract phase if strategy is scd2 and data is arrow/pandas. Alternatively we can add conditional warning to existing message in normalize step
@rudolfix A thanks I didn't saw that. I had a problem because this was a bit hidden on the logs.
So if I understood correctly for all scd arrows this doesn't work is there a reason why this is not added directly when scd? I imagine you serialize to arrowmost of the connections unless there is a arrow direct connector, correct?