dlt icon indicating copy to clipboard operation
dlt copied to clipboard

Staging tables are not created when needed (DatabaseUndefinedRelation)

Open agrueneberg opened this issue 6 months ago • 10 comments

dlt version

1.13.0

Describe the problem

Hi all,

one of our jobs is sometimes failing with an DatabaseUndefinedRelation error because one resource couldn't find its associated staging table. We request this resource incrementally with write_disposition = merge, but initially perform a historic load with write_disposition = replace. The job will continue to run successfully until there is an actual change to that particular resource (data, not schema), after which all subsequent runs will fail. If we perform another "full refresh" (meaning write_disposition = replace) on that particular resource, the job will run again (until the next change to the resource occurs).

Step de_erp: dagster._core.errors.DagsterExecutionStepExecutionError: Error occurred while executing op "de_erp":

Stack Trace:
  File "/home/app/.local/lib/python3.12/site-packages/dagster/_core/execution/plan/execute_plan.py", line 246, in dagster_event_sequence_for_step
    yield from check.generator(step_events)
  File "/home/app/.local/lib/python3.12/site-packages/dagster/_core/execution/plan/execute_step.py", line 504, in core_dagster_event_sequence_for_step
    for user_event in _step_output_error_checked_user_event_sequence(
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/app/.local/lib/python3.12/site-packages/dagster/_core/execution/plan/execute_step.py", line 185, in _step_output_error_checked_user_event_sequence
    for user_event in user_event_sequence:
                      ^^^^^^^^^^^^^^^^^^^
  File "/home/app/.local/lib/python3.12/site-packages/dagster/_core/execution/plan/execute_step.py", line 88, in _process_asset_results_to_events
    for user_event in user_event_sequence:
                      ^^^^^^^^^^^^^^^^^^^
  File "/home/app/.local/lib/python3.12/site-packages/dagster/_core/execution/plan/compute.py", line 187, in execute_core_compute
    for step_output in _yield_compute_results(step_context, inputs, compute_fn, compute_context):
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/app/.local/lib/python3.12/site-packages/dagster/_core/execution/plan/compute.py", line 156, in _yield_compute_results
    for event in iterate_with_context(
                 ^^^^^^^^^^^^^^^^^^^^^
  File "/home/app/.local/lib/python3.12/site-packages/dagster/_utils/__init__.py", line 390, in iterate_with_context
    with context_fn():
         ^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/contextlib.py", line 158, in __exit__
    self.gen.throw(value)
  File "/home/app/.local/lib/python3.12/site-packages/dagster/_core/execution/plan/utils.py", line 87, in op_execution_error_boundary
    raise error_cls(

The above exception was caused by the following exception:
dlt.pipeline.exceptions.PipelineStepFailed: Pipeline execution failed at `step=load` when processing package with `load_id=1752170429.6172163` with exception:

<class 'dlt.destinations.exceptions.DatabaseUndefinedRelation'>
Code: 60.
DB::Exception: Table dwh_dlt.de_erp_staging__manufacturing__work_center_definition does not exist. Maybe you meant dwh_dlt.de_erp__manufacturing__work_center_definition?. Stack trace:

0. DB::Exception::Exception(DB::Exception::MessageMasked&&, int, bool) @ 0x000000000fccc51b 1. DB::Exception::Exception(PreformattedMessage&&, int) @ 0x0000000009f1822c 2. DB::Exception::Exception<String, String, String, String>(int, FormatStringHelperImpl<std::type_identity<String>::type, std::type_identity<String>::type, std::type_identity<String>::type, std::type_identity<String>::type>, String&&, String&&, String&&, String&&) @ 0x0000000009fb712b 3. DB::IDatabase::getTable(String const&, std::shared_ptr<DB::Context const>) const @ 0x00000000130bd609 4. DB::DatabaseCatalog::getTableImpl(DB::StorageID const&, std::shared_ptr<DB::Context const>, std::optional<DB::Exception>*) const @ 0x0000000013ba5550 5. DB::DatabaseCatalog::getDatabaseAndTable(DB::StorageID const&, std::shared_ptr<DB::Context const>) const @ 0x0000000013baeba3 6. DB::InterpreterDropQuery::executeToTableImpl(std::shared_ptr<DB::Context const> const&, DB::ASTDropQuery&, std::shared_ptr<DB::IDatabase>&, StrongTypedef<wide::integer<128ul, unsigned int>, DB::UUIDTag>&) @ 0x0000000013cb2050 7. DB::InterpreterDropQuery::executeSingleDropQuery(std::shared_ptr<DB::IAST> const&) @ 0x0000000013cad704 8. DB::InterpreterDropQuery::execute() @ 0x0000000013cad2e3 9. DB::executeQueryImpl(char const*, char const*, std::shared_ptr<DB::Context>, DB::QueryFlags, DB::QueryProcessingStage::Enum, DB::ReadBuffer*, std::shared_ptr<DB::IAST>&) @ 0x000000001407f248 10. DB::executeQuery(String const&, std::shared_ptr<DB::Context>, DB::QueryFlags, DB::QueryProcessingStage::Enum) @ 0x0000000014078946 11. DB::TCPHandler::runImpl() @ 0x000000001540ebfe 12. DB::TCPHandler::run() @ 0x000000001542d979 13. Poco::Net::TCPServerConnection::start() @ 0x0000000018b54d07 14. Poco::Net::TCPServerDispatcher::run() @ 0x0000000018b55159 15. Poco::PooledThread::run() @ 0x0000000018b2045b 16. Poco::ThreadImpl::runnableEntry(void*) @ 0x0000000018b1e93d 17. ? @ 0x00007fa16437b1f5 18. ? @ 0x00007fa1643fab00

The code of the resource:

@dlt.resource(
    write_disposition="merge",
    columns={
        "WorkCenterDefinitionId": {
            "data_type": "text",  # UNIQUEIDENTIFIER
            "nullable": False,
        },
        "WorkCenterDefinitionNumber": {
            "data_type": "bigint",
            "precision": 32,
            "nullable": False,
        },
        "RowStatus": {
            "data_type": "bigint",
            "precision": 32,
            "nullable": False,
        },
        "ChangeDateUtc": {
            "data_type": "timestamp",
            "nullable": False,
        },
    },
    primary_key="WorkCenterDefinitionId",
)
def manufacturing__work_center_definition(
    updated_at=dlt.sources.incremental("ChangeDateUtc"),  # noqa: B008
):
    """Extracts DE_ERP.Manufacturing.WorkCenterDefinition."""
    query = """
        SELECT
            WorkCenterDefinitionId,
            WorkCenterDefinitionNumber,
            RowStatus,
            ChangeDateUtc
        FROM Manufacturing.WorkCenterDefinition
        WHERE 1 = 1
    """
    query_args = []
    if updated_at.start_value:
        query += """
            AND ChangeDateUtc >= ?
        """
        query_args.append(updated_at.start_value)
    yield from mssql.SqlServer.single_select(DATABASE, query, *query_args)

The source simply returns an array of other table resources similar to the one above.

Expected behavior

The staging table is automatically created and the job will succeed.

Steps to reproduce

I tried to replicate this issue locally, but couldn't find the problem. If you need any more information, please let me know.

Operating system

Linux

Runtime environment

Kubernetes

Python version

3.12

dlt data source

Source: A custom resource accessing SQL Server Destination: ClickHouse (which is not on the list of dlt destinations)

dlt destination

No response

Other deployment details

The job is scheduled with Dagster on Kubernetes (i.e., on a clean file system on each job run with no prior pipeline state).

Additional information

No response

agrueneberg avatar Jul 11 '25 06:07 agrueneberg

Hey @agrueneberg, What do you mean with "change to this resource"? A change of the schema of the data? Or do you mean it fails on the first load that uses a merge?

Are you able to give us some code that we can just execute and see this problem occur? Without it, it will be hard to debug. Ideally you can give us some example data.

sh-rp avatar Jul 11 '25 13:07 sh-rp

Hi David,

when I say "change to this resource," I'm referring specifically to a data change within the table. This particular table is rarely updated, but every time it is changed, the bug appears to occur consistently (unfortunately, this is a production system and I cannot change the table at will).

This resource is part of a larger refresh job in Dagster that currently processes 52 assets in total—roughly half are dlt-based (from different pipelines), and the other half use dbt. Most dlt assets, including the problematic one, use incremental loading.

The data change itself seems minor:

WorkCenterDefinitionId	WorkCenterDefinitionNumber	RowStatus	CreateUserId	CreateDateUtc	ChangeUserId	ChangeDateUtc
6E060000-AD18-0242-F933-08DDBFBFC3E9	619	5	611	2025-07-10 14:40:50.3016253	611	2025-07-10 14:40:50.3016253
6B860000-AD18-0242-F296-08DDB960E874	244	5	501	2025-07-02 12:06:42.4441438	501	2025-07-02 12:06:42.4441438
6B860000-AD18-0242-834D-08DDB95E9DB1	223	1	501	2025-07-02 11:50:18.0172010	501	2025-07-02 11:50:18.0172010
61E40000-AD18-0242-8E12-08DDB23D4747	242	5	501	2025-06-23 10:04:01.5165657	501	2025-06-23 10:04:01.5165657
Image

...

Image

As shown in the screenshots, the error consistently occurs right after the table is modified. I then manually trigger a "full refresh" (i.e., with write_dispotion="replace" at the dlt.run level) of the resource, which resolves the issue and allows the job to continue running normally (until the next change).

I’ve attempted to reproduce this behavior using a single resource, accounting for pipeline runs in Kubernetes running with a clean environment each time, but everything is working fine...

import datetime
import shutil

import dlt


t = 0


@dlt.resource(
    write_disposition="merge",
    primary_key="key",
)
def my_resource(
    updated_at=dlt.sources.incremental("ldts"),
):
    print(t, updated_at.start_value)
    if t == 0:
        # Initial pair
        yield {
            "key": 1,
            "value": 1,
            "ldts": datetime.datetime(2025, 1, 1, 0, 0),
        }
    if t == 1:
        # No change
        yield {
            "key": 1,
            "value": 1,
            "ldts": datetime.datetime(2025, 1, 1, 0, 0),
        }
    if t == 2:
        # No change
        yield {
            "key": 1,
            "value": 1,
            "ldts": datetime.datetime(2025, 1, 1, 0, 0),
        }
    if t == 3:
        # Data change: value 1 -> 2
        yield {
            "key": 1,
            "value": 2,
            "ldts": datetime.datetime(2025, 1, 2, 0, 0),
        }
    if t == 4:
        # No change
        yield {
            "key": 1,
            "value": 2,
            "ldts": datetime.datetime(2025, 1, 2, 0, 0),
        }


@dlt.source
def my_source():
    return [
        my_resource(),
    ]


pipeline = dlt.pipeline(
    pipeline_name="test",
    pipelines_dir="./dlt",
    destination="clickhouse",
    dataset_name="test",
)
pipeline.run(
    data=my_source(),
    write_disposition="replace",  # historic load
)
shutil.rmtree("./dlt")  # start from scratch (k8s)

t = 1
pipeline = dlt.pipeline(
    pipeline_name="test",
    pipelines_dir="./dlt",
    destination="clickhouse",
    dataset_name="test",
)
pipeline.run(
    data=my_source(),
)
shutil.rmtree("./dlt")

t = 2
pipeline = dlt.pipeline(
    pipeline_name="test",
    pipelines_dir="./dlt",
    destination="clickhouse",
    dataset_name="test",
)
pipeline.run(
    data=my_source(),
)
shutil.rmtree("./dlt")

t = 3
pipeline = dlt.pipeline(
    pipeline_name="test",
    pipelines_dir="./dlt",
    destination="clickhouse",
    dataset_name="test",
)
pipeline.run(
    data=my_source(),
)
shutil.rmtree("./dlt")

t = 4
pipeline = dlt.pipeline(
    pipeline_name="test",
    pipelines_dir="./dlt",
    destination="clickhouse",
    dataset_name="test",
)
pipeline.run(
    data=my_source(),
)
shutil.rmtree("./dlt")

Additionally, I attempted to debug the issue locally but wasn’t able to pinpoint exactly when or where the staging tables are supposed to be created. If you could point me to the relevant part of the code or process responsible for that, I may be able to properly debug the issue the next time it occurs.

If I understand correctly, dagster-dlt's design allows mixing and matching resources from different pipelines without issue (as long as the pipelines are not run concurrently), since schema detection is handled independently at the resource level?

agrueneberg avatar Jul 14 '25 07:07 agrueneberg

Hey @agrueneberg,

without some code that demonstrates the issue it will be hard to debug. One question though, are you running pipelines with the same name in parallel? Or are there any resource refreshes going on from another machine will you run this merge? A few things you can check:

  • If you run pipelines in parallel, give them separate names
  • If you use write dispositions that require staging datasets (some replace settings and merge) in parallel from multiple pipelines in parallel against the same dataset, you need a separate staging dataset name for each of these pipelines, otherwise the data you are loading will be mixed up or even tables might not be available that are needed from another pipeline

sh-rp avatar Jul 14 '25 08:07 sh-rp

Hi David,

having heard about the concurrency problems, I tried my best to avoid running pipelines with the same name in parallel. However, in Dagster, this is not always easy, as assets can be manifested both manually and automatically (job runs requested by schedules, sensors, etc.). That said, I don't think this is the problem here.

You mention separate staging dataset names, but the problem is that there is no staging table for this resource in the first place. To my knowledge, staging tables only get truncated before the load operation, not deleted. In this particular case it appears that a staging table was not needed for the initial historic load, and then didn't get created for the merge load even though the table contents have changed.

Cheers, Alex

agrueneberg avatar Jul 17 '25 09:07 agrueneberg

Hi, I do have separate pipelines names on dagster, still having the exact same problem on mssql target: <class 'dlt.destinations.exceptions.DatabaseUndefinedRelation'> ('42S02', '[42S02] [Microsoft][ODBC Driver 18 for SQL Server][SQL Server]Cannot find the object "table__sub_table" because it does not exist or you do not have permissions. (4701) (SQLExecDirectW)')

Hey @agrueneberg,

without some code that demonstrates the issue it will be hard to debug. One question though, are you running pipelines with the same name in parallel? Or are there any resource refreshes going on from another machine will you run this merge? A few things you can check:

  • If you run pipelines in parallel, give them separate names
  • If you use write dispositions that require staging datasets (some replace settings and merge) in parallel from multiple pipelines in parallel against the same dataset, you need a separate staging dataset name for each of these pipelines, otherwise the data you are loading will be mixed up or even tables might not be available that are needed from another pipeline

This only happens with one of the 22 tables

axellpadilla avatar Jul 18 '25 14:07 axellpadilla

Follow up, doing a refresh = drop_resources and/or replace also fails (I have the root_key on always true):

This with drop_resources:

Context

Dagster op: database__dbo__table__sub_Table

Pipeline step: load

Error chain

PipelineStepFailed at step=load

Caused by TableChainFollowupJobCreationFailedException:

“Failed creating follow-up jobs for table chain with root table table.”

Root cause: MergeDispositionException

“Could not generate merge job. Data for tables ['database', 'table__sub_Table'] was loaded to staging.

Reason: no root_key column (e.g. _dlt_root_id) found in child table table__sub_table.”

Schema:

version: "0.1"
dataset_name: "mongo_db_crm_hn"
tables:
  - name: table # root table
    columns:
      _id: text                   # primary key
      id: bigint                  # business key
      # … other columns grouped by purpose (timestamps, content, metrics, DLT metadata)
      _dlt_load_id: text          # DLT load identifier
      _dlt_id: text               # DLT row identifier
    write_disposition: merge

  - name: table__sub_table  # sub-table of table
    parent: table
    columns: {}                   # no columns defined (missing root_key?)
    write_disposition: merge

This is with replace:

Context

Dagster op: database__dbo__table__sub_Table

Pipeline step: load

Error chain

PipelineStepFailed at step=load

Caused by DatabaseTransientException (ODBC Driver 18 / SQL Server error 4901):

“ALTER TABLE only allows columns to be added that can contain nulls, or have a DEFAULT definition specified, or the column being added is an identity or timestamp column, or alternatively if none of the previous conditions are satisfied the table must be empty to allow addition of this column. Column _dlt_root_id cannot be added to non-empty table table_sub_table because it does not satisfy these conditions.”

Propagated as pyodbc.ProgrammingError with the same SQL-level message.

Final solution:

Manually deleted everything and after the already done drop_resources and pending packages, it loaded

axellpadilla avatar Jul 18 '25 14:07 axellpadilla

Hi Axell,

so, we're both using Dagster for orchestration, and the issue seems to affect only a small subset of mostly similar tables. That suggests the problem might not be related to the destination—ClickHouse in my case, SQL Server in yours. It also sounds like you're not running Dagster on Kubernetes, since you mentioned having pending load packages. Is that correct?

agrueneberg avatar Jul 21 '25 06:07 agrueneberg

Hi @agrueneberg , that's correct, I also tried to solve similar problems by using dlt directly, but the same settings and sources caused similar problems, for example, while I solved the first load of my example table, it started failing again now with the "Could not create SQLFollowupJob" , with table__subtable1__subtable2 problem.

Oh... just tried and noticed it is receiving no data sometimes, just did drop_pending_packages and it worked, maybe it is related to a pipeline that received no data or no data for a field that generates a sub table and it remains bugged for next runs, still, I can't replicate, tried afterwards without that setting and empty runs and nothing. Please let me know if setting drop_pending_packages to true fixes something.

axellpadilla avatar Jul 22 '25 21:07 axellpadilla

Hi @agrueneberg , that's correct, I also tried to solve similar problems by using dlt directly, but the same settings and sources caused similar problems, for example, while I solved the first load of my example table, it started failing again now with the "Could not create SQLFollowupJob" , with table__subtable1__subtable2 problem.

Oh... just tried and noticed it is receiving no data sometimes, just did drop_pending_packages and it worked, maybe it is related to a pipeline that received no data or no data for a field that generates a sub table and it remains bugged for next runs, still, I can't replicate, tried afterwards without that setting and empty runs and nothing. Please let me know if setting drop_pending_packages to true fixes something.

I'm facing a similar problem with a completely different setup. Deleting a staging table due to not receiving data on any of the runs (either first or subsequent) is a bug IMO, and it is a good catch - likely the cause of the problem.

jukiewiczm-marketer avatar Nov 06 '25 13:11 jukiewiczm-marketer

I can't reproduce it using the default scenario(s) like:

  • make one of the resources return nothing at first
  • make it actually return data at subsequent run etc. It all seems to work.

@sh-rp could you shed some light on when could the following happen:

  • table in a staging dataset getting deleted
  • dlt assuming the staging table is there while it's not

I'm asking about any potential not bug-related behavior. That could shed some light on where's the issue and if there's a bug somewhere. I'm getting this error during table truncation, there's a table to load but it doesn't exist in the staging schema. That might be the first time the data actually appears for this table (so there's a chance it wasn't ever deleted, just never created).

Can e.g. loading multiple packages at once, some of them having missing data resources cause this?

jukiewiczm-marketer avatar Nov 10 '25 11:11 jukiewiczm-marketer