dlt icon indicating copy to clipboard operation
dlt copied to clipboard

Schema inference does not respect result from query adapter

Open jesperbagge opened this issue 8 months ago • 1 comments

dlt version

1.10.0

Describe the problem

Background

I'm tasked to ELT a table from an ERP system. The table consists of > 200 columns and > 1M records. This takes several hours to run and due to the records often changing state / gets deleted I have to reload the entire table every day. My stakeholders are only interested in ~ 40 of the columns so there is little point in wasting resources on unwanted data. Only loading those ~ 40 columns speeds up the job considerably.

The problem

I went about this by using a query_adapter_callback to cherry pick the sought after columns. The extraction works wonderfully but the job breaks in the normalization process since the schema inference even at reflection_level="minimal" does not take into account that only a subset of the columns has been loaded. Several skipped columns are non-nullable in the schema, causing the normalizer to raise UnboundColumnException.

Expected behavior

I expected that the pipeline would run successfully creating a table in my destination (parquet-file on an s3 interface in my case) containing the subset of columns.

Steps to reproduce

Example

This problem is recreated in the following small example:

import dlt
from dlt.sources.sql_database import sql_database
import sqlalchemy as sa

dlt.secrets["sources.sql_database.rfam.credentials"] = "mysql+pymysql://[email protected]:4497/Rfam"

def query_adapter_callback(query, table):
    # only columns specified
    if table.name == 'family':
        columns_to_select = ['rfam_acc','description']
        return sa.select(*(table.c[col] for col in columns_to_select))
    # Use the original query for other tables
    return query

source = sql_database.with_args(name='rfam')(
    backend="sqlalchemy",
    reflection_level="minimal",
    table_names=["family","genome"],
    query_adapter_callback=query_adapter_callback
)

pipeline = dlt.pipeline(
    pipeline_name="example_pipeline",
    destination="duckdb",
    dataset_name="rfam",
    dev_mode=True
    # progress='log'
)

info = pipeline.run(source,write_disposition='replace')
print(info)

Operating system

macOS, Linux

Runtime environment

Local

Python version

3.12

dlt data source

An Oracle database using the SQL database. But the issue is reproducible using any database.

dlt destination

Filesystem & buckets

Other deployment details

No response

Additional information

Although I'm confident using Python, I'm a data engineer and not a software developer. I wish that the schema inference would take into account that not the entire table has been extracted. Perhaps by another reflection_level or additional flag that drops columns from the infered schema before comparing with the queried data.

jesperbagge avatar Apr 24 '25 06:04 jesperbagge

@jesperbagge thanks for the ticket and the repro! Some notes for checking this for the team:

  • Can we get the schema for a query upfront in sqlalchemy, or do we need to switch of reflecting the schema and discover it from the data in these cases?
  • Why does it not work with reflection level minimal? I think because we still get the column names with nullable not set, and the normalizer will complain that data for these columns is missing.

sh-rp avatar Apr 24 '25 08:04 sh-rp

I also came across this issue. The workaround I used is to use the table_adapter_callback, which is executed after the reflection of the table but before the creation of the schema.

The default_table_adapter has an implementation of removing columns that could be adapted to this use case. https://github.com/dlt-hub/dlt/blob/80ed2cd2445acbd3f2fea84b79436613861f07be/dlt/sources/sql_database/schema_types.py#L46

def default_table_adapter(table: Table, included_columns: Optional[List[str]]) -> None:
    """Default table adapter being always called before custom one"""
    if included_columns is not None:
        # Delete columns not included in the load
        for col in list(table._columns):  # type: ignore[attr-defined]
            if col.name not in included_columns:
                table._columns.remove(col)  # type: ignore[attr-defined]
    for col in table._columns:  # type: ignore[attr-defined]
        sql_t = col.type
        if hasattr(sqltypes, "Uuid") and isinstance(sql_t, sqltypes.Uuid):
            # emit uuids as string by default
            sql_t.as_uuid = False

malramsay64 avatar Jun 26 '25 06:06 malramsay64

I was able to reproduce the issue with a toy example with a similar schema, and the solution proposed by @malramsay64 (Using table_adapter_callback instead of query_adapter_callback ) worked.

#!/usr/bin/env python3

import sqlite3
import os
import dlt
from dlt.sources.sql_database import sql_database
import sqlalchemy as sa
import duckdb

def create_test_db():
    db_path = "test.db"
    if os.path.exists(db_path):
        os.remove(db_path)
    
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()
    
    # Create first table with nullable and non-nullable columns (like 'family' in original)
    cursor.execute('''
        CREATE TABLE family (
            rfam_acc TEXT PRIMARY KEY,
            description TEXT,
            author TEXT NOT NULL,
            seed_source TEXT NOT NULL,
            gathering_cutoff REAL,
            trusted_cutoff REAL NOT NULL,
            noise_cutoff REAL NOT NULL,
            comment TEXT
        )
    ''')
    
    # Create second table (like 'genome' in original)
    cursor.execute('''
        CREATE TABLE genome (
            upid TEXT PRIMARY KEY,
            assembly_acc TEXT NOT NULL,
            assembly_level TEXT NOT NULL,
            genome_acc TEXT NOT NULL,
            wgs_acc TEXT,
            length INTEGER NOT NULL,
            num_families INTEGER NOT NULL
        )
    ''')
    
    # Insert test data into family table
    family_data = [
        ('TEST001', 'Test RNA', 'Test Author', 'Test Source', 10.0, 10.5, 9.5, 'Test Comment'),
    ]
    
    cursor.executemany('''
        INSERT INTO family (rfam_acc, description, author, seed_source, gathering_cutoff, trusted_cutoff, noise_cutoff, comment)
        VALUES (?, ?, ?, ?, ?, ?, ?, ?)
    ''', family_data)
    
    # Insert test data into genome table
    genome_data = [
        ('TEST001', 'GCF_TEST001', 'Complete Genome', 'NC_TEST001', 'TEST01', 1000000, 100),
    ]
    
    cursor.executemany('''
        INSERT INTO genome (upid, assembly_acc, assembly_level, genome_acc, wgs_acc, length, num_families)
        VALUES (?, ?, ?, ?, ?, ?, ?)
    ''', genome_data)
    
    conn.commit()
    conn.close()
    return db_path

def test_adapter_callback(test_query_adapter: bool):
    
    # Create test database
    db_path = create_test_db()
    
    try:
        credentials = f"sqlite:///{db_path}"
        
        def query_adapter_callback(query, table):
            # Only columns specified for family table (mimicking original issue)
            if table.name == 'family':
                columns_to_select = ['rfam_acc', 'description']
                return sa.select(*(table.c[col] for col in columns_to_select))
            return query
        
        def table_adapter_callback(table):
            if table.name == 'family':
                columns_to_keep = ['rfam_acc', 'description']
                for col in list(table._columns):
                    if col.name not in columns_to_keep:
                        table._columns.remove(col)
            return table
        
        source = sql_database(
            credentials=credentials,
            backend="sqlalchemy",
            reflection_level="minimal",
            table_names=["family", "genome"],
            query_adapter_callback=query_adapter_callback if test_query_adapter else None,
            table_adapter_callback=None if test_query_adapter else table_adapter_callback
        )
        
        pipeline = dlt.pipeline(
            pipeline_name="test_pipeline",
            destination="duckdb",
            dataset_name="test_data",
        )
        
        print("Running pipeline...")
        info = pipeline.run(source, write_disposition='replace')
        print("Pipeline completed successfully!")
        print(f"✅ Loaded {info.loads_ids} with {len(info.load_packages[0].jobs['completed_jobs'])} jobs")
        
        duckdb_path = "test_pipeline.duckdb"
        if os.path.exists(duckdb_path):
            print("\nChecking column counts in duckdb database:")
            conn = duckdb.connect(duckdb_path)
            
            # Get all tables in the test_data schema (excluding _dlt tables)
            tables_query = "SELECT table_name FROM information_schema.tables WHERE table_schema = 'test_data' AND table_name NOT LIKE '_dlt%'"
            tables = conn.execute(tables_query).fetchall()
            
            for (table_name,) in tables:
                # Get column names excluding _dlt columns
                columns_query = f"SELECT column_name FROM information_schema.columns WHERE table_schema = 'test_data' AND table_name = '{table_name}' AND column_name NOT LIKE '_dlt%' ORDER BY ordinal_position"
                columns = [row[0] for row in conn.execute(columns_query).fetchall()]
                print(f"  Table '{table_name}': {columns}")
            
            conn.close()
        
    except Exception as e:
        print(f"❌ Error occurred: {e}")
        return False
    finally:
        # Cleanup
        if os.path.exists(db_path):
            os.remove(db_path)
        
        # Cleanup duckdb file
        duckdb_path = "test_pipeline.duckdb"
        if os.path.exists(duckdb_path):
            os.remove(duckdb_path)
            print(f"Cleaned up {duckdb_path}")
    
    return True

if __name__ == "__main__":
    print("Testing query_adapter_callback (should fail):")
    test_adapter_callback(test_query_adapter=True)
    print("\nTesting table_adapter_callback (should work):")
    test_adapter_callback(test_query_adapter=False)

Output:

Testing query_adapter_callback (should fail):
Running pipeline...
❌ Error occurred: Pipeline execution failed at `step=normalize` when processing package with `load_id=1752259734.516622` with exception:

<class 'dlt.common.schema.exceptions.UnboundColumnException'>
In schema `sql_database`: The following columns in table `family` did not receive any data during this load:
  - author (marked as non-nullable and must have values)

This can happen if you specify columns manually, for example, using the `merge_key`, `primary_key` or `columns` argument but they do not exist in the data.

Cleaned up test_pipeline.duckdb

Testing table_adapter_callback (should work):
Running pipeline...
Pipeline completed successfully!
✅ Loaded ['1752259734.708155'] with 3 jobs

Checking column counts in duckdb database:
  Table 'family': ['rfam_acc', 'description']
  Table 'genome': ['upid', 'assembly_acc', 'assembly_level', 'genome_acc', 'wgs_acc', 'length', 'num_families']
Cleaned up test_pipeline.duckdb

I'm opening a PR to add this to the docs.

franloza avatar Jul 11 '25 18:07 franloza