Staging tables are not created when needed (DatabaseUndefinedRelation)
dlt version
1.13.0
Describe the problem
Hi all,
one of our jobs is sometimes failing with an DatabaseUndefinedRelation error because one resource couldn't find its associated staging table. We request this resource incrementally with write_disposition = merge, but initially perform a historic load with write_disposition = replace. The job will continue to run successfully until there is an actual change to that particular resource (data, not schema), after which all subsequent runs will fail. If we perform another "full refresh" (meaning write_disposition = replace) on that particular resource, the job will run again (until the next change to the resource occurs).
Step de_erp: dagster._core.errors.DagsterExecutionStepExecutionError: Error occurred while executing op "de_erp":
Stack Trace:
File "/home/app/.local/lib/python3.12/site-packages/dagster/_core/execution/plan/execute_plan.py", line 246, in dagster_event_sequence_for_step
yield from check.generator(step_events)
File "/home/app/.local/lib/python3.12/site-packages/dagster/_core/execution/plan/execute_step.py", line 504, in core_dagster_event_sequence_for_step
for user_event in _step_output_error_checked_user_event_sequence(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/app/.local/lib/python3.12/site-packages/dagster/_core/execution/plan/execute_step.py", line 185, in _step_output_error_checked_user_event_sequence
for user_event in user_event_sequence:
^^^^^^^^^^^^^^^^^^^
File "/home/app/.local/lib/python3.12/site-packages/dagster/_core/execution/plan/execute_step.py", line 88, in _process_asset_results_to_events
for user_event in user_event_sequence:
^^^^^^^^^^^^^^^^^^^
File "/home/app/.local/lib/python3.12/site-packages/dagster/_core/execution/plan/compute.py", line 187, in execute_core_compute
for step_output in _yield_compute_results(step_context, inputs, compute_fn, compute_context):
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/app/.local/lib/python3.12/site-packages/dagster/_core/execution/plan/compute.py", line 156, in _yield_compute_results
for event in iterate_with_context(
^^^^^^^^^^^^^^^^^^^^^
File "/home/app/.local/lib/python3.12/site-packages/dagster/_utils/__init__.py", line 390, in iterate_with_context
with context_fn():
^^^^^^^^^^^^
File "/usr/local/lib/python3.12/contextlib.py", line 158, in __exit__
self.gen.throw(value)
File "/home/app/.local/lib/python3.12/site-packages/dagster/_core/execution/plan/utils.py", line 87, in op_execution_error_boundary
raise error_cls(
The above exception was caused by the following exception:
dlt.pipeline.exceptions.PipelineStepFailed: Pipeline execution failed at `step=load` when processing package with `load_id=1752170429.6172163` with exception:
<class 'dlt.destinations.exceptions.DatabaseUndefinedRelation'>
Code: 60.
DB::Exception: Table dwh_dlt.de_erp_staging__manufacturing__work_center_definition does not exist. Maybe you meant dwh_dlt.de_erp__manufacturing__work_center_definition?. Stack trace:
0. DB::Exception::Exception(DB::Exception::MessageMasked&&, int, bool) @ 0x000000000fccc51b 1. DB::Exception::Exception(PreformattedMessage&&, int) @ 0x0000000009f1822c 2. DB::Exception::Exception<String, String, String, String>(int, FormatStringHelperImpl<std::type_identity<String>::type, std::type_identity<String>::type, std::type_identity<String>::type, std::type_identity<String>::type>, String&&, String&&, String&&, String&&) @ 0x0000000009fb712b 3. DB::IDatabase::getTable(String const&, std::shared_ptr<DB::Context const>) const @ 0x00000000130bd609 4. DB::DatabaseCatalog::getTableImpl(DB::StorageID const&, std::shared_ptr<DB::Context const>, std::optional<DB::Exception>*) const @ 0x0000000013ba5550 5. DB::DatabaseCatalog::getDatabaseAndTable(DB::StorageID const&, std::shared_ptr<DB::Context const>) const @ 0x0000000013baeba3 6. DB::InterpreterDropQuery::executeToTableImpl(std::shared_ptr<DB::Context const> const&, DB::ASTDropQuery&, std::shared_ptr<DB::IDatabase>&, StrongTypedef<wide::integer<128ul, unsigned int>, DB::UUIDTag>&) @ 0x0000000013cb2050 7. DB::InterpreterDropQuery::executeSingleDropQuery(std::shared_ptr<DB::IAST> const&) @ 0x0000000013cad704 8. DB::InterpreterDropQuery::execute() @ 0x0000000013cad2e3 9. DB::executeQueryImpl(char const*, char const*, std::shared_ptr<DB::Context>, DB::QueryFlags, DB::QueryProcessingStage::Enum, DB::ReadBuffer*, std::shared_ptr<DB::IAST>&) @ 0x000000001407f248 10. DB::executeQuery(String const&, std::shared_ptr<DB::Context>, DB::QueryFlags, DB::QueryProcessingStage::Enum) @ 0x0000000014078946 11. DB::TCPHandler::runImpl() @ 0x000000001540ebfe 12. DB::TCPHandler::run() @ 0x000000001542d979 13. Poco::Net::TCPServerConnection::start() @ 0x0000000018b54d07 14. Poco::Net::TCPServerDispatcher::run() @ 0x0000000018b55159 15. Poco::PooledThread::run() @ 0x0000000018b2045b 16. Poco::ThreadImpl::runnableEntry(void*) @ 0x0000000018b1e93d 17. ? @ 0x00007fa16437b1f5 18. ? @ 0x00007fa1643fab00
The code of the resource:
@dlt.resource(
write_disposition="merge",
columns={
"WorkCenterDefinitionId": {
"data_type": "text", # UNIQUEIDENTIFIER
"nullable": False,
},
"WorkCenterDefinitionNumber": {
"data_type": "bigint",
"precision": 32,
"nullable": False,
},
"RowStatus": {
"data_type": "bigint",
"precision": 32,
"nullable": False,
},
"ChangeDateUtc": {
"data_type": "timestamp",
"nullable": False,
},
},
primary_key="WorkCenterDefinitionId",
)
def manufacturing__work_center_definition(
updated_at=dlt.sources.incremental("ChangeDateUtc"), # noqa: B008
):
"""Extracts DE_ERP.Manufacturing.WorkCenterDefinition."""
query = """
SELECT
WorkCenterDefinitionId,
WorkCenterDefinitionNumber,
RowStatus,
ChangeDateUtc
FROM Manufacturing.WorkCenterDefinition
WHERE 1 = 1
"""
query_args = []
if updated_at.start_value:
query += """
AND ChangeDateUtc >= ?
"""
query_args.append(updated_at.start_value)
yield from mssql.SqlServer.single_select(DATABASE, query, *query_args)
The source simply returns an array of other table resources similar to the one above.
Expected behavior
The staging table is automatically created and the job will succeed.
Steps to reproduce
I tried to replicate this issue locally, but couldn't find the problem. If you need any more information, please let me know.
Operating system
Linux
Runtime environment
Kubernetes
Python version
3.12
dlt data source
Source: A custom resource accessing SQL Server Destination: ClickHouse (which is not on the list of dlt destinations)
dlt destination
No response
Other deployment details
The job is scheduled with Dagster on Kubernetes (i.e., on a clean file system on each job run with no prior pipeline state).
Additional information
No response
Hey @agrueneberg, What do you mean with "change to this resource"? A change of the schema of the data? Or do you mean it fails on the first load that uses a merge?
Are you able to give us some code that we can just execute and see this problem occur? Without it, it will be hard to debug. Ideally you can give us some example data.
Hi David,
when I say "change to this resource," I'm referring specifically to a data change within the table. This particular table is rarely updated, but every time it is changed, the bug appears to occur consistently (unfortunately, this is a production system and I cannot change the table at will).
This resource is part of a larger refresh job in Dagster that currently processes 52 assets in total—roughly half are dlt-based (from different pipelines), and the other half use dbt. Most dlt assets, including the problematic one, use incremental loading.
The data change itself seems minor:
WorkCenterDefinitionId WorkCenterDefinitionNumber RowStatus CreateUserId CreateDateUtc ChangeUserId ChangeDateUtc
6E060000-AD18-0242-F933-08DDBFBFC3E9 619 5 611 2025-07-10 14:40:50.3016253 611 2025-07-10 14:40:50.3016253
6B860000-AD18-0242-F296-08DDB960E874 244 5 501 2025-07-02 12:06:42.4441438 501 2025-07-02 12:06:42.4441438
6B860000-AD18-0242-834D-08DDB95E9DB1 223 1 501 2025-07-02 11:50:18.0172010 501 2025-07-02 11:50:18.0172010
61E40000-AD18-0242-8E12-08DDB23D4747 242 5 501 2025-06-23 10:04:01.5165657 501 2025-06-23 10:04:01.5165657
...
As shown in the screenshots, the error consistently occurs right after the table is modified. I then manually trigger a "full refresh" (i.e., with write_dispotion="replace" at the dlt.run level) of the resource, which resolves the issue and allows the job to continue running normally (until the next change).
I’ve attempted to reproduce this behavior using a single resource, accounting for pipeline runs in Kubernetes running with a clean environment each time, but everything is working fine...
import datetime
import shutil
import dlt
t = 0
@dlt.resource(
write_disposition="merge",
primary_key="key",
)
def my_resource(
updated_at=dlt.sources.incremental("ldts"),
):
print(t, updated_at.start_value)
if t == 0:
# Initial pair
yield {
"key": 1,
"value": 1,
"ldts": datetime.datetime(2025, 1, 1, 0, 0),
}
if t == 1:
# No change
yield {
"key": 1,
"value": 1,
"ldts": datetime.datetime(2025, 1, 1, 0, 0),
}
if t == 2:
# No change
yield {
"key": 1,
"value": 1,
"ldts": datetime.datetime(2025, 1, 1, 0, 0),
}
if t == 3:
# Data change: value 1 -> 2
yield {
"key": 1,
"value": 2,
"ldts": datetime.datetime(2025, 1, 2, 0, 0),
}
if t == 4:
# No change
yield {
"key": 1,
"value": 2,
"ldts": datetime.datetime(2025, 1, 2, 0, 0),
}
@dlt.source
def my_source():
return [
my_resource(),
]
pipeline = dlt.pipeline(
pipeline_name="test",
pipelines_dir="./dlt",
destination="clickhouse",
dataset_name="test",
)
pipeline.run(
data=my_source(),
write_disposition="replace", # historic load
)
shutil.rmtree("./dlt") # start from scratch (k8s)
t = 1
pipeline = dlt.pipeline(
pipeline_name="test",
pipelines_dir="./dlt",
destination="clickhouse",
dataset_name="test",
)
pipeline.run(
data=my_source(),
)
shutil.rmtree("./dlt")
t = 2
pipeline = dlt.pipeline(
pipeline_name="test",
pipelines_dir="./dlt",
destination="clickhouse",
dataset_name="test",
)
pipeline.run(
data=my_source(),
)
shutil.rmtree("./dlt")
t = 3
pipeline = dlt.pipeline(
pipeline_name="test",
pipelines_dir="./dlt",
destination="clickhouse",
dataset_name="test",
)
pipeline.run(
data=my_source(),
)
shutil.rmtree("./dlt")
t = 4
pipeline = dlt.pipeline(
pipeline_name="test",
pipelines_dir="./dlt",
destination="clickhouse",
dataset_name="test",
)
pipeline.run(
data=my_source(),
)
shutil.rmtree("./dlt")
Additionally, I attempted to debug the issue locally but wasn’t able to pinpoint exactly when or where the staging tables are supposed to be created. If you could point me to the relevant part of the code or process responsible for that, I may be able to properly debug the issue the next time it occurs.
If I understand correctly, dagster-dlt's design allows mixing and matching resources from different pipelines without issue (as long as the pipelines are not run concurrently), since schema detection is handled independently at the resource level?
Hey @agrueneberg,
without some code that demonstrates the issue it will be hard to debug. One question though, are you running pipelines with the same name in parallel? Or are there any resource refreshes going on from another machine will you run this merge? A few things you can check:
- If you run pipelines in parallel, give them separate names
- If you use write dispositions that require staging datasets (some replace settings and merge) in parallel from multiple pipelines in parallel against the same dataset, you need a separate staging dataset name for each of these pipelines, otherwise the data you are loading will be mixed up or even tables might not be available that are needed from another pipeline
Hi David,
having heard about the concurrency problems, I tried my best to avoid running pipelines with the same name in parallel. However, in Dagster, this is not always easy, as assets can be manifested both manually and automatically (job runs requested by schedules, sensors, etc.). That said, I don't think this is the problem here.
You mention separate staging dataset names, but the problem is that there is no staging table for this resource in the first place. To my knowledge, staging tables only get truncated before the load operation, not deleted. In this particular case it appears that a staging table was not needed for the initial historic load, and then didn't get created for the merge load even though the table contents have changed.
Cheers, Alex
Hi, I do have separate pipelines names on dagster, still having the exact same problem on mssql target: <class 'dlt.destinations.exceptions.DatabaseUndefinedRelation'> ('42S02', '[42S02] [Microsoft][ODBC Driver 18 for SQL Server][SQL Server]Cannot find the object "table__sub_table" because it does not exist or you do not have permissions. (4701) (SQLExecDirectW)')
Hey @agrueneberg,
without some code that demonstrates the issue it will be hard to debug. One question though, are you running pipelines with the same name in parallel? Or are there any resource refreshes going on from another machine will you run this merge? A few things you can check:
- If you run pipelines in parallel, give them separate names
- If you use write dispositions that require staging datasets (some replace settings and merge) in parallel from multiple pipelines in parallel against the same dataset, you need a separate staging dataset name for each of these pipelines, otherwise the data you are loading will be mixed up or even tables might not be available that are needed from another pipeline
This only happens with one of the 22 tables
Follow up, doing a refresh = drop_resources and/or replace also fails (I have the root_key on always true):
This with drop_resources:
Context
Dagster op: database__dbo__table__sub_Table
Pipeline step: load
Error chain
PipelineStepFailed at step=load
Caused by TableChainFollowupJobCreationFailedException:
“Failed creating follow-up jobs for table chain with root table table.”
Root cause: MergeDispositionException
“Could not generate merge job. Data for tables ['database', 'table__sub_Table'] was loaded to staging.
Reason: no root_key column (e.g. _dlt_root_id) found in child table table__sub_table.”
Schema:
version: "0.1"
dataset_name: "mongo_db_crm_hn"
tables:
- name: table # root table
columns:
_id: text # primary key
id: bigint # business key
# … other columns grouped by purpose (timestamps, content, metrics, DLT metadata)
_dlt_load_id: text # DLT load identifier
_dlt_id: text # DLT row identifier
write_disposition: merge
- name: table__sub_table # sub-table of table
parent: table
columns: {} # no columns defined (missing root_key?)
write_disposition: merge
This is with replace:
Context
Dagster op: database__dbo__table__sub_Table
Pipeline step: load
Error chain
PipelineStepFailed at step=load
Caused by DatabaseTransientException (ODBC Driver 18 / SQL Server error 4901):
“ALTER TABLE only allows columns to be added that can contain nulls, or have a DEFAULT definition specified, or the column being added is an identity or timestamp column, or alternatively if none of the previous conditions are satisfied the table must be empty to allow addition of this column. Column _dlt_root_id cannot be added to non-empty table table_sub_table because it does not satisfy these conditions.”
Propagated as pyodbc.ProgrammingError with the same SQL-level message.
Final solution:
Manually deleted everything and after the already done drop_resources and pending packages, it loaded
Hi Axell,
so, we're both using Dagster for orchestration, and the issue seems to affect only a small subset of mostly similar tables. That suggests the problem might not be related to the destination—ClickHouse in my case, SQL Server in yours. It also sounds like you're not running Dagster on Kubernetes, since you mentioned having pending load packages. Is that correct?
Hi @agrueneberg , that's correct, I also tried to solve similar problems by using dlt directly, but the same settings and sources caused similar problems, for example, while I solved the first load of my example table, it started failing again now with the "Could not create SQLFollowupJob" , with table__subtable1__subtable2 problem.
Oh... just tried and noticed it is receiving no data sometimes, just did drop_pending_packages and it worked, maybe it is related to a pipeline that received no data or no data for a field that generates a sub table and it remains bugged for next runs, still, I can't replicate, tried afterwards without that setting and empty runs and nothing. Please let me know if setting drop_pending_packages to true fixes something.
Hi @agrueneberg , that's correct, I also tried to solve similar problems by using dlt directly, but the same settings and sources caused similar problems, for example, while I solved the first load of my example table, it started failing again now with the "Could not create SQLFollowupJob" , with table__subtable1__subtable2 problem.
Oh... just tried and noticed it is receiving no data sometimes, just did drop_pending_packages and it worked, maybe it is related to a pipeline that received no data or no data for a field that generates a sub table and it remains bugged for next runs, still, I can't replicate, tried afterwards without that setting and empty runs and nothing. Please let me know if setting drop_pending_packages to true fixes something.
I'm facing a similar problem with a completely different setup. Deleting a staging table due to not receiving data on any of the runs (either first or subsequent) is a bug IMO, and it is a good catch - likely the cause of the problem.
I can't reproduce it using the default scenario(s) like:
- make one of the resources return nothing at first
- make it actually return data at subsequent run etc. It all seems to work.
@sh-rp could you shed some light on when could the following happen:
- table in a staging dataset getting deleted
- dlt assuming the staging table is there while it's not
I'm asking about any potential not bug-related behavior. That could shed some light on where's the issue and if there's a bug somewhere. I'm getting this error during table truncation, there's a table to load but it doesn't exist in the staging schema. That might be the first time the data actually appears for this table (so there's a chance it wasn't ever deleted, just never created).
Can e.g. loading multiple packages at once, some of them having missing data resources cause this?