Redshift error message improvement on different owners
Feature description
If the owner of the relationships changes or the loading jobs, pipelines start failing with error "schema already exists"
a better error message would be nice
https://dlthub-community.slack.com/archives/C04DQA7JJN6/p1744995341412969
Are you a dlt user?
None
Use case
No response
Proposed solution
No response
Related issues
No response
@adrianbr this is what probably happens
- pipeline starts and
dltchecks if schema exists - redshift says no
dltcreates schema- redshift says schema exists
we have no meaningful way to detect the case. what we could do is to add warning to all schema exists errors. are you able to ask the user to get a full stack trace
Bumped into the scenario again and remembered this conversation. The stack trace from the error:
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,690] --------------------- Load rest_api in 1750287050.4699779 ----------------------
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,690] Jobs: 0/4 (0.0%) | Time: 0.06s | Rate: 0.00/s
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,695] Traceback (most recent call last):
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,695] File "/usr/local/lib/python3.10/site-packages/dlt/destinations/sql_client.py", line 435, in _wrap_gen
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,695] return (yield from f(self, *args, **kwargs))
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,695] File "/usr/local/lib/python3.10/site-packages/dlt/destinations/impl/postgres/sql_client.py", line 112, in execute_query
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,695] raise outer
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,695] File "/usr/local/lib/python3.10/site-packages/dlt/destinations/impl/postgres/sql_client.py", line 104, in execute_query
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,695] curr.execute(query, db_args)
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,695] psycopg2.errors.DuplicateSchema: Schema "klaviyo_cdp_source" already exists
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,695] During handling of the above exception, another exception occurred:
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,695] Traceback (most recent call last):
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,695] File "/usr/local/lib/python3.10/site-packages/dlt/pipeline/pipeline.py", line 594, in load
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,695] runner.run_pool(load_step.config, load_step)
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,695] File "/usr/local/lib/python3.10/site-packages/dlt/common/runners/pool_runner.py", line 91, in run_pool
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,696] while _run_func():
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,696] File "/usr/local/lib/python3.10/site-packages/dlt/common/runners/pool_runner.py", line 84, in _run_func
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,696] run_metrics = run_f.run(cast(TExecutor, pool))
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,696] File "/usr/local/lib/python3.10/site-packages/dlt/load/load.py", line 638, in run
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,696] self.load_single_package(load_id, schema)
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,696] File "/usr/local/lib/python3.10/site-packages/dlt/load/load.py", line 527, in load_single_package
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,696] applied_update = init_client(
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,696] File "/usr/local/lib/python3.10/site-packages/dlt/load/utils.py", line 117, in init_client
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,696] applied_update = _init_dataset_and_update_schema(
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,696] File "/usr/local/lib/python3.10/site-packages/dlt/load/utils.py", line 175, in _init_dataset_and_update_schema
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,696] job_client.initialize_storage()
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,696] File "/usr/local/lib/python3.10/site-packages/dlt/destinations/job_client_impl.py", line 289, in initialize_storage
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,696] self.sql_client.create_dataset()
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,696] File "/usr/local/lib/python3.10/site-packages/dlt/destinations/sql_client.py", line 126, in create_dataset
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,696] self.execute_sql("CREATE SCHEMA %s" % self.fully_qualified_dataset_name())
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,696] File "/usr/local/lib/python3.10/site-packages/dlt/destinations/impl/postgres/sql_client.py", line 90, in execute_sql
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,696] with self.execute_query(sql, *args, **kwargs) as curr:
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,696] File "/usr/local/lib/python3.10/contextlib.py", line 135, in __enter__
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,696] return next(self.gen)
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,696] File "/usr/local/lib/python3.10/site-packages/dlt/destinations/sql_client.py", line 437, in _wrap_gen
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,697] raise self._make_database_exception(ex)
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,697] dlt.destinations.exceptions.DatabaseTerminalException: Schema "klaviyo_cdp_source" already exists
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,697] The above exception was the direct cause of the following exception:
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,697] Traceback (most recent call last):
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,697] File "/usr/local/lib/python3.10/runpy.py", line 196, in _run_module_as_main
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,697] return _run_code(code, main_globals, None,
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,697] File "/usr/local/lib/python3.10/runpy.py", line 86, in _run_code
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,697] exec(code, run_globals)
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,697] File "/usr/local/lib/python3.10/site-packages/dsc/klaviyo_ingest/app.py", line 7, in <module>
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,697] main()
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,697] File "/usr/local/lib/python3.10/site-packages/dsc/klaviyo_ingest/core.py", line 72, in main
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,697] load_info = pipeline.run(source)
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,697] File "/usr/local/lib/python3.10/site-packages/dlt/pipeline/pipeline.py", line 227, in _wrap
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,697] step_info = f(self, *args, **kwargs)
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,697] File "/usr/local/lib/python3.10/site-packages/dlt/pipeline/pipeline.py", line 276, in _wrap
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,697] return f(self, *args, **kwargs)
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,697] File "/usr/local/lib/python3.10/site-packages/dlt/pipeline/pipeline.py", line 738, in run
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,697] return self.load(destination, dataset_name, credentials=credentials)
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,697] File "/usr/local/lib/python3.10/site-packages/dlt/pipeline/pipeline.py", line 227, in _wrap
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,697] step_info = f(self, *args, **kwargs)
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,697] File "/usr/local/lib/python3.10/site-packages/dlt/pipeline/pipeline.py", line 167, in _wrap
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,697] return f(self, *args, **kwargs)
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,697] File "/usr/local/lib/python3.10/site-packages/dlt/pipeline/pipeline.py", line 276, in _wrap
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,698] return f(self, *args, **kwargs)
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,698] File "/usr/local/lib/python3.10/site-packages/dlt/pipeline/pipeline.py", line 601, in load
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,698] raise PipelineStepFailed(
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,698] dlt.pipeline.exceptions.PipelineStepFailed: Pipeline execution failed at `step=load` when processing package with `load_id=1750287050.4699779` with exception:
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,698] <class 'dlt.destinations.exceptions.DatabaseTerminalException'>
[2025-06-18, 15:53:12 PDT] {{task_log_fetcher.py:63}} INFO - [2025-06-18 22:52:59,698] Schema "klaviyo_cdp_source" already exists
edit: I found a query that returns all schema details regardless of access, tldr below.
What I believe happened is I created the schema manually with a superuser account, like admin_jon, then the first etl run in prod failed during the load operation because the particular etl user likely didn't either have usage or create grants inside the schema.
I did an experiment and if I create a schema
create schema no_one_can_use and do not grant usage on it most system tables don't include the schema.
show schemas; -- does not include schema without 'usage'
select
distinct table_schema -- does not include schema without 'usage'
from information_schema.columns;
select schema_name from SVV_REDSHIFT_SCHEMAS; -- Nope, does not include schema without 'usage'
select distinct table_schema from SVV_TABLES; -- Does return all tables on a non-superuser, table must exist in schema to be useful for this issue
Strange enough the Jetbrain's IDE is able to show table and schema details regardless of access. I observed the Redshift sys_query_history after causing Jetbrains to introspect the database and this query (excuse formatting)
select N.oid::bigint as id, N.xmin as state_number, N.nspname as name, D.description, pg_catalog.pg_get_userbyid(N.nspowner) as "owner" from pg_catalog.pg_namespace N left outer join pg_catalog.pg_external_schema ES on N.oid = ES.esoid left join pg_catalog.pg_description D on N.oid = D.objoid where ES.esoid is null order by case when N.nspname = pg_catalog.current_schema() then -1::bigint else N.oid::bigint end
Returns all schema information regardless of access. They execute at least two other variations to check for shared/external schemas/databases.