Schema inference does not respect result from query adapter
dlt version
1.10.0
Describe the problem
Background
I'm tasked to ELT a table from an ERP system. The table consists of > 200 columns and > 1M records. This takes several hours to run and due to the records often changing state / gets deleted I have to reload the entire table every day. My stakeholders are only interested in ~ 40 of the columns so there is little point in wasting resources on unwanted data. Only loading those ~ 40 columns speeds up the job considerably.
The problem
I went about this by using a query_adapter_callback to cherry pick the sought after columns. The extraction works wonderfully but the job breaks in the normalization process since the schema inference even at reflection_level="minimal" does not take into account that only a subset of the columns has been loaded. Several skipped columns are non-nullable in the schema, causing the normalizer to raise UnboundColumnException.
Expected behavior
I expected that the pipeline would run successfully creating a table in my destination (parquet-file on an s3 interface in my case) containing the subset of columns.
Steps to reproduce
Example
This problem is recreated in the following small example:
import dlt
from dlt.sources.sql_database import sql_database
import sqlalchemy as sa
dlt.secrets["sources.sql_database.rfam.credentials"] = "mysql+pymysql://[email protected]:4497/Rfam"
def query_adapter_callback(query, table):
# only columns specified
if table.name == 'family':
columns_to_select = ['rfam_acc','description']
return sa.select(*(table.c[col] for col in columns_to_select))
# Use the original query for other tables
return query
source = sql_database.with_args(name='rfam')(
backend="sqlalchemy",
reflection_level="minimal",
table_names=["family","genome"],
query_adapter_callback=query_adapter_callback
)
pipeline = dlt.pipeline(
pipeline_name="example_pipeline",
destination="duckdb",
dataset_name="rfam",
dev_mode=True
# progress='log'
)
info = pipeline.run(source,write_disposition='replace')
print(info)
Operating system
macOS, Linux
Runtime environment
Local
Python version
3.12
dlt data source
An Oracle database using the SQL database. But the issue is reproducible using any database.
dlt destination
Filesystem & buckets
Other deployment details
No response
Additional information
Although I'm confident using Python, I'm a data engineer and not a software developer. I wish that the schema inference would take into account that not the entire table has been extracted. Perhaps by another reflection_level or additional flag that drops columns from the infered schema before comparing with the queried data.
@jesperbagge thanks for the ticket and the repro! Some notes for checking this for the team:
- Can we get the schema for a query upfront in sqlalchemy, or do we need to switch of reflecting the schema and discover it from the data in these cases?
- Why does it not work with reflection level minimal? I think because we still get the column names with nullable not set, and the normalizer will complain that data for these columns is missing.
I also came across this issue. The workaround I used is to use the table_adapter_callback, which is executed after the reflection of the table but before the creation of the schema.
The default_table_adapter has an implementation of removing columns that could be adapted to this use case. https://github.com/dlt-hub/dlt/blob/80ed2cd2445acbd3f2fea84b79436613861f07be/dlt/sources/sql_database/schema_types.py#L46
def default_table_adapter(table: Table, included_columns: Optional[List[str]]) -> None:
"""Default table adapter being always called before custom one"""
if included_columns is not None:
# Delete columns not included in the load
for col in list(table._columns): # type: ignore[attr-defined]
if col.name not in included_columns:
table._columns.remove(col) # type: ignore[attr-defined]
for col in table._columns: # type: ignore[attr-defined]
sql_t = col.type
if hasattr(sqltypes, "Uuid") and isinstance(sql_t, sqltypes.Uuid):
# emit uuids as string by default
sql_t.as_uuid = False
I was able to reproduce the issue with a toy example with a similar schema, and the solution proposed by @malramsay64 (Using table_adapter_callback instead of query_adapter_callback ) worked.
#!/usr/bin/env python3
import sqlite3
import os
import dlt
from dlt.sources.sql_database import sql_database
import sqlalchemy as sa
import duckdb
def create_test_db():
db_path = "test.db"
if os.path.exists(db_path):
os.remove(db_path)
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Create first table with nullable and non-nullable columns (like 'family' in original)
cursor.execute('''
CREATE TABLE family (
rfam_acc TEXT PRIMARY KEY,
description TEXT,
author TEXT NOT NULL,
seed_source TEXT NOT NULL,
gathering_cutoff REAL,
trusted_cutoff REAL NOT NULL,
noise_cutoff REAL NOT NULL,
comment TEXT
)
''')
# Create second table (like 'genome' in original)
cursor.execute('''
CREATE TABLE genome (
upid TEXT PRIMARY KEY,
assembly_acc TEXT NOT NULL,
assembly_level TEXT NOT NULL,
genome_acc TEXT NOT NULL,
wgs_acc TEXT,
length INTEGER NOT NULL,
num_families INTEGER NOT NULL
)
''')
# Insert test data into family table
family_data = [
('TEST001', 'Test RNA', 'Test Author', 'Test Source', 10.0, 10.5, 9.5, 'Test Comment'),
]
cursor.executemany('''
INSERT INTO family (rfam_acc, description, author, seed_source, gathering_cutoff, trusted_cutoff, noise_cutoff, comment)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', family_data)
# Insert test data into genome table
genome_data = [
('TEST001', 'GCF_TEST001', 'Complete Genome', 'NC_TEST001', 'TEST01', 1000000, 100),
]
cursor.executemany('''
INSERT INTO genome (upid, assembly_acc, assembly_level, genome_acc, wgs_acc, length, num_families)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', genome_data)
conn.commit()
conn.close()
return db_path
def test_adapter_callback(test_query_adapter: bool):
# Create test database
db_path = create_test_db()
try:
credentials = f"sqlite:///{db_path}"
def query_adapter_callback(query, table):
# Only columns specified for family table (mimicking original issue)
if table.name == 'family':
columns_to_select = ['rfam_acc', 'description']
return sa.select(*(table.c[col] for col in columns_to_select))
return query
def table_adapter_callback(table):
if table.name == 'family':
columns_to_keep = ['rfam_acc', 'description']
for col in list(table._columns):
if col.name not in columns_to_keep:
table._columns.remove(col)
return table
source = sql_database(
credentials=credentials,
backend="sqlalchemy",
reflection_level="minimal",
table_names=["family", "genome"],
query_adapter_callback=query_adapter_callback if test_query_adapter else None,
table_adapter_callback=None if test_query_adapter else table_adapter_callback
)
pipeline = dlt.pipeline(
pipeline_name="test_pipeline",
destination="duckdb",
dataset_name="test_data",
)
print("Running pipeline...")
info = pipeline.run(source, write_disposition='replace')
print("Pipeline completed successfully!")
print(f"✅ Loaded {info.loads_ids} with {len(info.load_packages[0].jobs['completed_jobs'])} jobs")
duckdb_path = "test_pipeline.duckdb"
if os.path.exists(duckdb_path):
print("\nChecking column counts in duckdb database:")
conn = duckdb.connect(duckdb_path)
# Get all tables in the test_data schema (excluding _dlt tables)
tables_query = "SELECT table_name FROM information_schema.tables WHERE table_schema = 'test_data' AND table_name NOT LIKE '_dlt%'"
tables = conn.execute(tables_query).fetchall()
for (table_name,) in tables:
# Get column names excluding _dlt columns
columns_query = f"SELECT column_name FROM information_schema.columns WHERE table_schema = 'test_data' AND table_name = '{table_name}' AND column_name NOT LIKE '_dlt%' ORDER BY ordinal_position"
columns = [row[0] for row in conn.execute(columns_query).fetchall()]
print(f" Table '{table_name}': {columns}")
conn.close()
except Exception as e:
print(f"❌ Error occurred: {e}")
return False
finally:
# Cleanup
if os.path.exists(db_path):
os.remove(db_path)
# Cleanup duckdb file
duckdb_path = "test_pipeline.duckdb"
if os.path.exists(duckdb_path):
os.remove(duckdb_path)
print(f"Cleaned up {duckdb_path}")
return True
if __name__ == "__main__":
print("Testing query_adapter_callback (should fail):")
test_adapter_callback(test_query_adapter=True)
print("\nTesting table_adapter_callback (should work):")
test_adapter_callback(test_query_adapter=False)
Output:
Testing query_adapter_callback (should fail):
Running pipeline...
❌ Error occurred: Pipeline execution failed at `step=normalize` when processing package with `load_id=1752259734.516622` with exception:
<class 'dlt.common.schema.exceptions.UnboundColumnException'>
In schema `sql_database`: The following columns in table `family` did not receive any data during this load:
- author (marked as non-nullable and must have values)
This can happen if you specify columns manually, for example, using the `merge_key`, `primary_key` or `columns` argument but they do not exist in the data.
Cleaned up test_pipeline.duckdb
Testing table_adapter_callback (should work):
Running pipeline...
Pipeline completed successfully!
✅ Loaded ['1752259734.708155'] with 3 jobs
Checking column counts in duckdb database:
Table 'family': ['rfam_acc', 'description']
Table 'genome': ['upid', 'assembly_acc', 'assembly_level', 'genome_acc', 'wgs_acc', 'length', 'num_families']
Cleaned up test_pipeline.duckdb
I'm opening a PR to add this to the docs.