dlt icon indicating copy to clipboard operation
dlt copied to clipboard

`TableChainFollowupJobCreationFailedException` when loading Arrow table to ClickHouse

Open turtleDev opened this issue 9 months ago • 10 comments

dlt version

1.5.0

Describe the problem

When trying to load data from an arrow source to clickhouse destination using "merge" strategy, dlt raises dlt.load.exceptions.TableChainFollowupJobCreationFailedException exception.

This only occurs in arrow/pandas and clickhouse (source to dest) configuration. Other combinations work without issue.

Here's the full stack trace

2025-01-29 20:41:08,965|[WARNING]|240986|139845918012096|dlt|__init__.py|_check_duplicate_cursor_threshold:584|Large number of records (1000) sharing the same value of cursor field 'date'. This can happen if the cursor field has a low resolution (e.g., only stores dates without times), causing many records to share the same cursor value. Consider using a cursor column with higher resolution to reduce the deduplication state size.
2025-01-29 20:41:08,974|[WARNING]|240986|139845918012096|dlt|extractors.py|_compute_table:445|In resource: arrow_mmap, when merging arrow schema with dlt schema, several column hints were different. dlt schema hints were kept and arrow schema and data were unmodified. It is up to destination to coerce the differences when loading. Change log level to INFO for more details.
Traceback (most recent call last):
  File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/destinations/sql_jobs.py", line 76, in from_table_chain
    for stmt in cls.generate_sql(table_chain, sql_client, params)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/destinations/sql_jobs.py", line 173, in generate_sql
    merge_sql = cls.gen_merge_sql(table_chain, sql_client)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/destinations/sql_jobs.py", line 569, in gen_merge_sql
    cls._get_row_key_col(table_chain, sql_client, root_table)
  File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/destinations/sql_jobs.py", line 464, in _get_row_key_col
    raise MergeDispositionException(
dlt.destinations.exceptions.MergeDispositionException: Merge sql job for dataset name `public`.`public`, staging dataset name `public`.`public_staging` COULD NOT BE GENERATED. Merge will not be performed. Data for the following tables (['testdata']) is loaded to staging dataset. You may need to write your own materialization. The reason is:
No `row_key`, `unique`, or single primary key column (e.g. `_dlt_id`) in table `testdata`.

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/load/load.py", line 349, in create_followup_jobs
    if follow_up_jobs := client.create_table_chain_completed_followup_jobs(
                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/destinations/job_client_impl.py", line 270, in create_table_chain_completed_followup_jobs
    jobs.extend(self._create_merge_followup_jobs(table_chain))
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/destinations/impl/clickhouse/clickhouse.py", line 223, in _create_merge_followup_jobs
    return [ClickHouseMergeJob.from_table_chain(table_chain, self.sql_client)]
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/destinations/sql_jobs.py", line 81, in from_table_chain
    raise SqlJobCreationException(e, table_chain) from e
dlt.destinations.sql_jobs.SqlJobCreationException: Could not create SQLFollowupJob with exception Merge sql job for dataset name `public`.`public`, staging dataset name `public`.`public_staging` COULD NOT BE GENERATED. Merge will not be performed. Data for the following tables (['testdata']) is loaded to staging dataset. You may need to write your own materialization. The reason is:
No `row_key`, `unique`, or single primary key column (e.g. `_dlt_id`) in table `testdata`.. Table chain: - columns:
    id:
      name: id
      nullable: true
      data_type: bigint
    value:
      name: value
      nullable: true
      data_type: double
    category:
      name: category
      nullable: true
      data_type: text
    nested:
      name: nested
      nullable: true
      data_type: json
    date:
      name: date
      nullable: false
      data_type: text
      merge_key: true
      incremental: true
  name: testdata
  write_disposition: merge
  resource: arrow_mmap
  x-normalizer:
    seen-data: true


The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 608, in load
    runner.run_pool(load_step.config, load_step)
  File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/common/runners/pool_runner.py", line 91, in run_pool
    while _run_func():
          ^^^^^^^^^^^
  File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/common/runners/pool_runner.py", line 84, in _run_func
    run_metrics = run_f.run(cast(TExecutor, pool))
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/load/load.py", line 639, in run
    self.load_single_package(load_id, schema)
  File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/load/load.py", line 572, in load_single_package
    running_jobs, finalized_jobs, new_pending_exception = self.complete_jobs(
                                                          ^^^^^^^^^^^^^^^^^^^
  File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/load/load.py", line 447, in complete_jobs
    self.create_followup_jobs(load_id, state, job, schema)
  File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/load/load.py", line 354, in create_followup_jobs
    raise TableChainFollowupJobCreationFailedException(
dlt.load.exceptions.TableChainFollowupJobCreationFailedException: Failed creating table chain followup jobs for table chain with root table testdata.

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/arrow-to-clickhouse.py", line 75, in <module>
    load_info = pipeline.run(
                ^^^^^^^^^^^^^
  File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 226, in _wrap
    step_info = f(self, *args, **kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 275, in _wrap
    return f(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 747, in run
    return self.load(destination, dataset_name, credentials=credentials)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 226, in _wrap
    step_info = f(self, *args, **kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 166, in _wrap
    return f(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 275, in _wrap
    return f(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 615, in load
    raise PipelineStepFailed(
dlt.pipeline.exceptions.PipelineStepFailed: Pipeline execution failed at stage load when processing package 1738163468.926135 with exception:

<class 'dlt.load.exceptions.TableChainFollowupJobCreationFailedException'>
Failed creating table chain followup jobs for table chain with root table testdata.

Expected behavior

I expected to be able to produce data from arrow to clickhouse

Steps to reproduce

Here's a self contained script that can reproduce this issue:

import random
import dlt
import dlt.extract
from dlt.destinations.impl.clickhouse.configuration import (
    ClickHouseCredentials   
)
import pandas as pd
import numpy as np
import pyarrow as pa

"""
clickhouse setup

$ docker run -p 8123:8123 -p 9000:9000 clickhouse/clickhouse-server:24.12
$ docker exec -i ${CONTAINER_ID} /usr/bin/env clickhouse-client < _EOF
CREATE DATABASE IF NOT EXISTS "public";
CREATE USER IF NOT EXISTS dlt IDENTIFIED WITH sha256_password BY 'Dlt*12345789234567';
GRANT CREATE, ALTER, SELECT, DELETE, DROP, TRUNCATE, OPTIMIZE, SHOW, INSERT, dictGet ON public.* TO dlt;
GRANT CREATE, ALTER, SELECT, DELETE, DROP, TRUNCATE, OPTIMIZE, SHOW, INSERT, dictGet ON default.* TO dlt;
GRANT SELECT ON INFORMATION_SCHEMA.COLUMNS TO dlt;
GRANT CREATE TEMPORARY TABLE, S3 ON *.* TO dlt;
_EOF
"""

@dlt.resource(
    name="arrow_mmap",
    merge_key="date", 
    columns={'date': {'merge_key': True, 'name': 'date'}}
)
def arrow_mmap(
    incremental = dlt.sources.incremental(
        "date",
        initial_value=None,
        end_value=None,
        range_end="closed",
        range_start="closed",
    )
):

    row_count = 1000
    df = pd.DataFrame(
        {
            "id": range(row_count),
            "value": np.random.rand(row_count),
            "category": np.random.choice(["A", "B", "C"], size=row_count),
            "nested": [{"a": 1, "b": 2, "c": {"d": 3}}] * row_count,
            "date": ["2024-11-05"] * row_count,
        }
    )
    yield pa.Table.from_pandas(df)


source = arrow_mmap()

credentials = ClickHouseCredentials(
    {
        "host": "localhost",
        "port": "9000",
        "username": "dlt",
        "password": "Dlt*12345789234567",
        "database": "public",
        "http_port": 8123,
        "secure": 0,
    }
)

dest = dlt.destinations.clickhouse(credentials=credentials)

pipeline = dlt.pipeline(
    "".join(random.choices("abcdefghijklmop123456", k=5)),
    destination=dest,
    dataset_name="public",
)

load_info = pipeline.run(
    source,
    table_name="testdata",
    write_disposition="merge",
)

Here's another example that works fine, using duckdb dest:

import random
import dlt
import dlt.extract
import pandas as pd
import numpy as np
import pyarrow as pa

@dlt.resource(
    name="arrow_mmap",
    merge_key="date", 
    columns={'date': {'merge_key': True, 'name': 'date'}}
)
def arrow_mmap(
    incremental = dlt.sources.incremental(
        "date",
        initial_value=None,
        end_value=None,
        range_end="closed",
        range_start="closed",
    )
):

    row_count = 1000
    df = pd.DataFrame(
        {
            "id": range(row_count),
            "value": np.random.rand(row_count),
            "category": np.random.choice(["A", "B", "C"], size=row_count),
            "nested": [{"a": 1, "b": 2, "c": {"d": 3}}] * row_count,
            "date": ["2024-11-05"] * row_count,
        }
    )
    yield pa.Table.from_pandas(df)


source = arrow_mmap()

dest = dlt.destinations.duckdb("duckdb:///out.db")

pipeline = dlt.pipeline(
    "".join(random.choices("abcdefghijklmop123456", k=5)),
    destination=dest,
    dataset_name="dlt",
)

load_info = pipeline.run(
    source,
    table_name="testdata",
    write_disposition="merge",
)

Operating system

Linux

Runtime environment

Local

Python version

3.11

dlt data source

arrow https://dlthub.com/docs/dlt-ecosystem/verified-sources/arrow-pandas

dlt destination

Clickhouse

Additional information

clickhouse version: 24.12

turtleDev avatar Jan 29 '25 15:01 turtleDev

When you load arrow/panda frames, dlt will not add any columns to it, including _dlt_id if not explicitly requested. So you need to enable _dlt_id or set the ie. primary_key on your resources. it looks to me that id column is a good candidate

rudolfix avatar Feb 01 '25 11:02 rudolfix

How do we enable _dlt_id for dataframe?

Also, this behaviour seems like an inconsistency. According to docs, setting a merge_key should be enough.

I'd be happy to contribute a fix if you can point me in the right direction.

turtleDev avatar Feb 01 '25 11:02 turtleDev

here's how to do that: https://dlthub.com/docs/dlt-ecosystem/verified-sources/arrow-pandas#add-_dlt_load_id-and-_dlt_id-to-your-tables

yeah we'll unify all normalizers but that is a huge change so we'll need to wait for 2.0 to do that. right now for pipelines that pass arrow/panda frames we optimize to not rewrite the data so we do not add any columns by default pls. tell me if that helper

rudolfix avatar Feb 10 '25 17:02 rudolfix

@rudolfix Thanks! I'll test this out and get back to you.

turtleDev avatar Feb 10 '25 17:02 turtleDev

Enabling both dlt_load_id and dlt_id results in the following error:

Traceback (most recent call last):
  File "/home/user/dev/.venv/lib/python3.11/site-packages/dlt/normalize/worker.py", line 235, in w_normalize_files
    partial_updates = normalizer(extracted_items_file, root_table_name)
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/user/dev/.venv/lib/python3.11/site-packages/dlt/normalize/items_normalizers.py", line 367, in __call__
    schema_update = self._write_with_dlt_columns(
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/user/dev/.venv/lib/python3.11/site-packages/dlt/normalize/items_normalizers.py", line 272, in _write_with_dlt_columns
    for batch in pyarrow.pq_stream_with_new_columns(
  File "/home/user/dev/.venv/lib/python3.11/site-packages/dlt/common/libs/pyarrow.py", line 526, in pq_stream_with_new_columns
    tbl = tbl.append_column(field, gen_(tbl))
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "pyarrow/table.pxi", line 2451, in pyarrow.lib._Tabular.append_column
  File "pyarrow/table.pxi", line 5361, in pyarrow.lib.Table.add_column
  File "pyarrow/error.pxi", line 155, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow/error.pxi", line 92, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: Field type did not match data type

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/user/dev/.venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 553, in normalize
    runner.run_pool(normalize_step.config, normalize_step)
  File "/home/user/dev/.venv/lib/python3.11/site-packages/dlt/common/runners/pool_runner.py", line 91, in run_pool
    while _run_func():
          ^^^^^^^^^^^
  File "/home/user/dev/.venv/lib/python3.11/site-packages/dlt/common/runners/pool_runner.py", line 84, in _run_func
    run_metrics = run_f.run(cast(TExecutor, pool))
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/user/dev/.venv/lib/python3.11/site-packages/dlt/normalize/normalize.py", line 280, in run
    self.spool_schema_files(load_id, schema, schema_files)
  File "/home/user/dev/.venv/lib/python3.11/site-packages/dlt/normalize/normalize.py", line 234, in spool_schema_files
    self.spool_files(load_id, schema.clone(update_normalizers=True), map_f, files)
  File "/home/user/dev/.venv/lib/python3.11/site-packages/dlt/normalize/normalize.py", line 166, in spool_files
    schema_updates, writer_metrics = map_f(schema, load_id, files)
                                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/user/dev/.venv/lib/python3.11/site-packages/dlt/normalize/normalize.py", line 147, in map_single
    result = w_normalize_files(
             ^^^^^^^^^^^^^^^^^^
  File "/home/user/dev/.venv/lib/python3.11/site-packages/dlt/normalize/worker.py", line 241, in w_normalize_files
    raise NormalizeJobFailed(load_id, job_id, str(exc), writer_metrics) from exc
dlt.normalize.exceptions.NormalizeJobFailed: Job for output.a46c434351.parquet failed terminally in load 1739213283.0454676 with message Field type did not match data type.

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/user/dev/.venv/lib/python3.11/site-packages/click/testing.py", line 412, in invoke
    return_value = cli.main(args=args or (), prog_name=prog_name, **extra)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/user/dev/.venv/lib/python3.11/site-packages/typer/core.py", line 738, in main
    return _main(
           ^^^^^^
  File "/home/user/dev/.venv/lib/python3.11/site-packages/typer/core.py", line 197, in _main
    rv = self.invoke(ctx)
         ^^^^^^^^^^^^^^^^
  File "/home/user/dev/.venv/lib/python3.11/site-packages/click/core.py", line 1697, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
                           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/user/dev/.venv/lib/python3.11/site-packages/click/core.py", line 1443, in invoke
    return ctx.invoke(self.callback, **ctx.params)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/user/dev/.venv/lib/python3.11/site-packages/click/core.py", line 788, in invoke
    return __callback(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/user/dev/.venv/lib/python3.11/site-packages/typer/main.py", line 707, in wrapper
    return callback(**use_params)
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/home/user/dev/main.py", line 590, in ingest
    run_info: LoadInfo = pipeline.run(
                         ^^^^^^^^^^^^^
  File "/home/user/dev/.venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 226, in _wrap
    step_info = f(self, *args, **kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/user/dev/.venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 275, in _wrap
    return f(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/user/dev/.venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 746, in run
    self.normalize(loader_file_format=loader_file_format)
  File "/home/user/dev/.venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 226, in _wrap
    step_info = f(self, *args, **kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/user/dev/.venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 180, in _wrap
    rv = f(self, *args, **kwargs)
         ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/user/dev/.venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 275, in _wrap
    return f(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/user/dev/.venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 557, in normalize
    raise PipelineStepFailed(
dlt.pipeline.exceptions.PipelineStepFailed: Pipeline execution failed at stage normalize when processing package 1739213283.0454676 with exception:

<class 'dlt.normalize.exceptions.NormalizeJobFailed'>
Job for output.a46c434351.parquet failed terminally in load 1739213283.0454676 with message Field type did not match data type.

Enabling just dlt_load_id works, but still causes the TableChainFollowupJobCreationFailedException. Enabling both causes the aforementioned error.

I believe the error stems from the fact that dlt_id is written during normalization, but the schema that is generated ( in pq_stream_with_new_columns()) specifies it as string not null.

turtleDev avatar Feb 10 '25 18:02 turtleDev

@turtleDev just to confirm (before we classify that as bug and get if fixed):

  1. you get this exception when running the original script included in bug report (but with dlt id enabled)
  2. you run it in a clean environment (ie. by including dev_mode=True on the pipeline) this will help us a lot to reproduce problem locally

rudolfix avatar Feb 17 '25 12:02 rudolfix

Yeah, I can confirm @rudolfix.

turtleDev avatar Feb 17 '25 13:02 turtleDev

I have the same issue. Using the pyarrow backend going from PGSQL -> BQ and enabling this in the config

[normalize.parquet_normalizer]
add_dlt_load_id = true
add_dlt_id = true

will lead to

<class 'dlt.load.exceptions.TableChainFollowupJobCreationFailedException'>
Failed creating table chain followup jobs for table chain with root table prosumer.

I'm using the merge/scd2 strategy.

Using the merge strategy without scd2 works.

abuchmueller avatar Feb 18 '25 09:02 abuchmueller

@turtleDev exception in normalize looks like a bug. we'll investigate. do you have a primary key on your test data? then please declare it in the resource (ie. on id) and the problem will be gone. the reason it works on duckdb and not on clickhouse: we are using temp table to delete retired records on clickhouse (AFAIK we could not do DELETE with JOIN)

@abuchmueller your problem may be related to this: https://dlthub.com/docs/general-usage/incremental-loading#-use-scd2-with-arrow-tables-and-panda-frames right now dlt does not add row hash automatically to arrow tables / data frames. you need to use map function yourself

rudolfix avatar Mar 16 '25 21:03 rudolfix

bug info: investigate why normalization fails here: https://github.com/dlt-hub/dlt/issues/2248#issuecomment-2648945960

rudolfix avatar Mar 16 '25 21:03 rudolfix

@rudolfix I'm having this same problem with sqlserver destination, apparently, it doesn't create the root_key on the new table of an existing incremental load (even with root_key = true)

The reason is: No root_key column (e.g. _dlt_root_id) in table

Also looks like this issue #2088 is still present related to indexes and that could be part of the flow problem

I while ago, I tried to understand the code flow to help fixing it, but it was a little hard, do you have any documents or hints on how to understand the whole process, or maybe an already existing test that follows it that you recommend to analize?

axellpadilla avatar Apr 30 '25 15:04 axellpadilla

Hi @rudolfix, I realized one important hint that could help pinpoint this at least for this case, and I tried without look, my problematic column, for example root_table__child_array on the example corrupted schema, it's an empty list, if I replace the empty list with None or just delete de column the problem disappears, but, this column can indeed have data, it's just whole loads came empty:

I tried to reproduce with tests but again, no look, the only way was intentionally corrupting the schema or doing an append load first without root_key (empty list problematic column) and then a merge.

{
  "name": "default",
  "version": 1,
  "engine_version": "1.0.0",
  "tables": {
    "root_table": {
      "name": "root_table",
      "write_disposition": "merge",
      "columns": {
        "id": {
          "name": "id",
          "data_type": "text",
          "nullable": false,
          "primary_key": true
        },
        "title": {
          "name": "title",
          "data_type": "text",
          "nullable": true
        },
        "_dlt_id": {
          "name": "_dlt_id",
          "data_type": "text",
          "nullable": false,
          "unique": true,
          "row_key": true
        },
        "_dlt_load_id": {
          "name": "_dlt_load_id",
          "data_type": "text",
          "nullable": false
        }
      },
      "x-normalizer": {
        "seen-data": true
      }
    },
    "root_table__child_array": {
      "name": "root_table__child_array",
      "parent": "root_table",
      "write_disposition": "merge",
      "columns": {},
      "x-normalizer": {
        "seen-data": true
      }
    }
  },
  "settings": {}
}

Note: I'm currently testing with a custom normalizer like this and it's working for now, hope it stays like that:

def _normalize_problem_columns(doc: Dict):  # type: ignore
    for col in c.columns_to_normalize or ():
        if col not in doc:
            continue
        val = doc.get(col)
        # Remove empty lists entirely -> prevents creation of empty child table with no columns
        if isinstance(val, list):
            if len(val) == 0:
                del doc[col]
                continue
            # If list of dicts but all dicts empty, drop field
            if all(isinstance(it, dict) and len(it) == 0 for it in val):
                del doc[col]
                continue
        # If value is None or empty string just remove
        if val in (None, ""):
            del doc[col]
    return doc

axellpadilla avatar Sep 03 '25 14:09 axellpadilla