dlt
dlt copied to clipboard
`TableChainFollowupJobCreationFailedException` when loading Arrow table to ClickHouse
dlt version
1.5.0
Describe the problem
When trying to load data from an arrow source to clickhouse destination using "merge" strategy, dlt raises dlt.load.exceptions.TableChainFollowupJobCreationFailedException exception.
This only occurs in arrow/pandas and clickhouse (source to dest) configuration. Other combinations work without issue.
Here's the full stack trace
2025-01-29 20:41:08,965|[WARNING]|240986|139845918012096|dlt|__init__.py|_check_duplicate_cursor_threshold:584|Large number of records (1000) sharing the same value of cursor field 'date'. This can happen if the cursor field has a low resolution (e.g., only stores dates without times), causing many records to share the same cursor value. Consider using a cursor column with higher resolution to reduce the deduplication state size.
2025-01-29 20:41:08,974|[WARNING]|240986|139845918012096|dlt|extractors.py|_compute_table:445|In resource: arrow_mmap, when merging arrow schema with dlt schema, several column hints were different. dlt schema hints were kept and arrow schema and data were unmodified. It is up to destination to coerce the differences when loading. Change log level to INFO for more details.
Traceback (most recent call last):
File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/destinations/sql_jobs.py", line 76, in from_table_chain
for stmt in cls.generate_sql(table_chain, sql_client, params)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/destinations/sql_jobs.py", line 173, in generate_sql
merge_sql = cls.gen_merge_sql(table_chain, sql_client)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/destinations/sql_jobs.py", line 569, in gen_merge_sql
cls._get_row_key_col(table_chain, sql_client, root_table)
File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/destinations/sql_jobs.py", line 464, in _get_row_key_col
raise MergeDispositionException(
dlt.destinations.exceptions.MergeDispositionException: Merge sql job for dataset name `public`.`public`, staging dataset name `public`.`public_staging` COULD NOT BE GENERATED. Merge will not be performed. Data for the following tables (['testdata']) is loaded to staging dataset. You may need to write your own materialization. The reason is:
No `row_key`, `unique`, or single primary key column (e.g. `_dlt_id`) in table `testdata`.
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/load/load.py", line 349, in create_followup_jobs
if follow_up_jobs := client.create_table_chain_completed_followup_jobs(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/destinations/job_client_impl.py", line 270, in create_table_chain_completed_followup_jobs
jobs.extend(self._create_merge_followup_jobs(table_chain))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/destinations/impl/clickhouse/clickhouse.py", line 223, in _create_merge_followup_jobs
return [ClickHouseMergeJob.from_table_chain(table_chain, self.sql_client)]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/destinations/sql_jobs.py", line 81, in from_table_chain
raise SqlJobCreationException(e, table_chain) from e
dlt.destinations.sql_jobs.SqlJobCreationException: Could not create SQLFollowupJob with exception Merge sql job for dataset name `public`.`public`, staging dataset name `public`.`public_staging` COULD NOT BE GENERATED. Merge will not be performed. Data for the following tables (['testdata']) is loaded to staging dataset. You may need to write your own materialization. The reason is:
No `row_key`, `unique`, or single primary key column (e.g. `_dlt_id`) in table `testdata`.. Table chain: - columns:
id:
name: id
nullable: true
data_type: bigint
value:
name: value
nullable: true
data_type: double
category:
name: category
nullable: true
data_type: text
nested:
name: nested
nullable: true
data_type: json
date:
name: date
nullable: false
data_type: text
merge_key: true
incremental: true
name: testdata
write_disposition: merge
resource: arrow_mmap
x-normalizer:
seen-data: true
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 608, in load
runner.run_pool(load_step.config, load_step)
File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/common/runners/pool_runner.py", line 91, in run_pool
while _run_func():
^^^^^^^^^^^
File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/common/runners/pool_runner.py", line 84, in _run_func
run_metrics = run_f.run(cast(TExecutor, pool))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/load/load.py", line 639, in run
self.load_single_package(load_id, schema)
File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/load/load.py", line 572, in load_single_package
running_jobs, finalized_jobs, new_pending_exception = self.complete_jobs(
^^^^^^^^^^^^^^^^^^^
File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/load/load.py", line 447, in complete_jobs
self.create_followup_jobs(load_id, state, job, schema)
File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/load/load.py", line 354, in create_followup_jobs
raise TableChainFollowupJobCreationFailedException(
dlt.load.exceptions.TableChainFollowupJobCreationFailedException: Failed creating table chain followup jobs for table chain with root table testdata.
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/arrow-to-clickhouse.py", line 75, in <module>
load_info = pipeline.run(
^^^^^^^^^^^^^
File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 226, in _wrap
step_info = f(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 275, in _wrap
return f(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 747, in run
return self.load(destination, dataset_name, credentials=credentials)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 226, in _wrap
step_info = f(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 166, in _wrap
return f(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 275, in _wrap
return f(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/scribble/dlt-clickhouse-merge-bug-reproduction/venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 615, in load
raise PipelineStepFailed(
dlt.pipeline.exceptions.PipelineStepFailed: Pipeline execution failed at stage load when processing package 1738163468.926135 with exception:
<class 'dlt.load.exceptions.TableChainFollowupJobCreationFailedException'>
Failed creating table chain followup jobs for table chain with root table testdata.
Expected behavior
I expected to be able to produce data from arrow to clickhouse
Steps to reproduce
Here's a self contained script that can reproduce this issue:
import random
import dlt
import dlt.extract
from dlt.destinations.impl.clickhouse.configuration import (
ClickHouseCredentials
)
import pandas as pd
import numpy as np
import pyarrow as pa
"""
clickhouse setup
$ docker run -p 8123:8123 -p 9000:9000 clickhouse/clickhouse-server:24.12
$ docker exec -i ${CONTAINER_ID} /usr/bin/env clickhouse-client < _EOF
CREATE DATABASE IF NOT EXISTS "public";
CREATE USER IF NOT EXISTS dlt IDENTIFIED WITH sha256_password BY 'Dlt*12345789234567';
GRANT CREATE, ALTER, SELECT, DELETE, DROP, TRUNCATE, OPTIMIZE, SHOW, INSERT, dictGet ON public.* TO dlt;
GRANT CREATE, ALTER, SELECT, DELETE, DROP, TRUNCATE, OPTIMIZE, SHOW, INSERT, dictGet ON default.* TO dlt;
GRANT SELECT ON INFORMATION_SCHEMA.COLUMNS TO dlt;
GRANT CREATE TEMPORARY TABLE, S3 ON *.* TO dlt;
_EOF
"""
@dlt.resource(
name="arrow_mmap",
merge_key="date",
columns={'date': {'merge_key': True, 'name': 'date'}}
)
def arrow_mmap(
incremental = dlt.sources.incremental(
"date",
initial_value=None,
end_value=None,
range_end="closed",
range_start="closed",
)
):
row_count = 1000
df = pd.DataFrame(
{
"id": range(row_count),
"value": np.random.rand(row_count),
"category": np.random.choice(["A", "B", "C"], size=row_count),
"nested": [{"a": 1, "b": 2, "c": {"d": 3}}] * row_count,
"date": ["2024-11-05"] * row_count,
}
)
yield pa.Table.from_pandas(df)
source = arrow_mmap()
credentials = ClickHouseCredentials(
{
"host": "localhost",
"port": "9000",
"username": "dlt",
"password": "Dlt*12345789234567",
"database": "public",
"http_port": 8123,
"secure": 0,
}
)
dest = dlt.destinations.clickhouse(credentials=credentials)
pipeline = dlt.pipeline(
"".join(random.choices("abcdefghijklmop123456", k=5)),
destination=dest,
dataset_name="public",
)
load_info = pipeline.run(
source,
table_name="testdata",
write_disposition="merge",
)
Here's another example that works fine, using duckdb dest:
import random
import dlt
import dlt.extract
import pandas as pd
import numpy as np
import pyarrow as pa
@dlt.resource(
name="arrow_mmap",
merge_key="date",
columns={'date': {'merge_key': True, 'name': 'date'}}
)
def arrow_mmap(
incremental = dlt.sources.incremental(
"date",
initial_value=None,
end_value=None,
range_end="closed",
range_start="closed",
)
):
row_count = 1000
df = pd.DataFrame(
{
"id": range(row_count),
"value": np.random.rand(row_count),
"category": np.random.choice(["A", "B", "C"], size=row_count),
"nested": [{"a": 1, "b": 2, "c": {"d": 3}}] * row_count,
"date": ["2024-11-05"] * row_count,
}
)
yield pa.Table.from_pandas(df)
source = arrow_mmap()
dest = dlt.destinations.duckdb("duckdb:///out.db")
pipeline = dlt.pipeline(
"".join(random.choices("abcdefghijklmop123456", k=5)),
destination=dest,
dataset_name="dlt",
)
load_info = pipeline.run(
source,
table_name="testdata",
write_disposition="merge",
)
Operating system
Linux
Runtime environment
Local
Python version
3.11
dlt data source
arrow
https://dlthub.com/docs/dlt-ecosystem/verified-sources/arrow-pandas
dlt destination
Clickhouse
Additional information
clickhouse version: 24.12
When you load arrow/panda frames, dlt will not add any columns to it, including _dlt_id if not explicitly requested. So you need to enable _dlt_id or set the ie. primary_key on your resources. it looks to me that id column is a good candidate
How do we enable _dlt_id for dataframe?
Also, this behaviour seems like an inconsistency. According to docs, setting a merge_key should be enough.
I'd be happy to contribute a fix if you can point me in the right direction.
here's how to do that: https://dlthub.com/docs/dlt-ecosystem/verified-sources/arrow-pandas#add-_dlt_load_id-and-_dlt_id-to-your-tables
yeah we'll unify all normalizers but that is a huge change so we'll need to wait for 2.0 to do that. right now for pipelines that pass arrow/panda frames we optimize to not rewrite the data so we do not add any columns by default pls. tell me if that helper
@rudolfix Thanks! I'll test this out and get back to you.
Enabling both dlt_load_id and dlt_id results in the following error:
Traceback (most recent call last):
File "/home/user/dev/.venv/lib/python3.11/site-packages/dlt/normalize/worker.py", line 235, in w_normalize_files
partial_updates = normalizer(extracted_items_file, root_table_name)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/dev/.venv/lib/python3.11/site-packages/dlt/normalize/items_normalizers.py", line 367, in __call__
schema_update = self._write_with_dlt_columns(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/dev/.venv/lib/python3.11/site-packages/dlt/normalize/items_normalizers.py", line 272, in _write_with_dlt_columns
for batch in pyarrow.pq_stream_with_new_columns(
File "/home/user/dev/.venv/lib/python3.11/site-packages/dlt/common/libs/pyarrow.py", line 526, in pq_stream_with_new_columns
tbl = tbl.append_column(field, gen_(tbl))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "pyarrow/table.pxi", line 2451, in pyarrow.lib._Tabular.append_column
File "pyarrow/table.pxi", line 5361, in pyarrow.lib.Table.add_column
File "pyarrow/error.pxi", line 155, in pyarrow.lib.pyarrow_internal_check_status
File "pyarrow/error.pxi", line 92, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: Field type did not match data type
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/user/dev/.venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 553, in normalize
runner.run_pool(normalize_step.config, normalize_step)
File "/home/user/dev/.venv/lib/python3.11/site-packages/dlt/common/runners/pool_runner.py", line 91, in run_pool
while _run_func():
^^^^^^^^^^^
File "/home/user/dev/.venv/lib/python3.11/site-packages/dlt/common/runners/pool_runner.py", line 84, in _run_func
run_metrics = run_f.run(cast(TExecutor, pool))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/dev/.venv/lib/python3.11/site-packages/dlt/normalize/normalize.py", line 280, in run
self.spool_schema_files(load_id, schema, schema_files)
File "/home/user/dev/.venv/lib/python3.11/site-packages/dlt/normalize/normalize.py", line 234, in spool_schema_files
self.spool_files(load_id, schema.clone(update_normalizers=True), map_f, files)
File "/home/user/dev/.venv/lib/python3.11/site-packages/dlt/normalize/normalize.py", line 166, in spool_files
schema_updates, writer_metrics = map_f(schema, load_id, files)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/dev/.venv/lib/python3.11/site-packages/dlt/normalize/normalize.py", line 147, in map_single
result = w_normalize_files(
^^^^^^^^^^^^^^^^^^
File "/home/user/dev/.venv/lib/python3.11/site-packages/dlt/normalize/worker.py", line 241, in w_normalize_files
raise NormalizeJobFailed(load_id, job_id, str(exc), writer_metrics) from exc
dlt.normalize.exceptions.NormalizeJobFailed: Job for output.a46c434351.parquet failed terminally in load 1739213283.0454676 with message Field type did not match data type.
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/user/dev/.venv/lib/python3.11/site-packages/click/testing.py", line 412, in invoke
return_value = cli.main(args=args or (), prog_name=prog_name, **extra)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/dev/.venv/lib/python3.11/site-packages/typer/core.py", line 738, in main
return _main(
^^^^^^
File "/home/user/dev/.venv/lib/python3.11/site-packages/typer/core.py", line 197, in _main
rv = self.invoke(ctx)
^^^^^^^^^^^^^^^^
File "/home/user/dev/.venv/lib/python3.11/site-packages/click/core.py", line 1697, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/dev/.venv/lib/python3.11/site-packages/click/core.py", line 1443, in invoke
return ctx.invoke(self.callback, **ctx.params)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/dev/.venv/lib/python3.11/site-packages/click/core.py", line 788, in invoke
return __callback(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/dev/.venv/lib/python3.11/site-packages/typer/main.py", line 707, in wrapper
return callback(**use_params)
^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/dev/main.py", line 590, in ingest
run_info: LoadInfo = pipeline.run(
^^^^^^^^^^^^^
File "/home/user/dev/.venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 226, in _wrap
step_info = f(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/dev/.venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 275, in _wrap
return f(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/dev/.venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 746, in run
self.normalize(loader_file_format=loader_file_format)
File "/home/user/dev/.venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 226, in _wrap
step_info = f(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/dev/.venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 180, in _wrap
rv = f(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/dev/.venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 275, in _wrap
return f(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/dev/.venv/lib/python3.11/site-packages/dlt/pipeline/pipeline.py", line 557, in normalize
raise PipelineStepFailed(
dlt.pipeline.exceptions.PipelineStepFailed: Pipeline execution failed at stage normalize when processing package 1739213283.0454676 with exception:
<class 'dlt.normalize.exceptions.NormalizeJobFailed'>
Job for output.a46c434351.parquet failed terminally in load 1739213283.0454676 with message Field type did not match data type.
Enabling just dlt_load_id works, but still causes the TableChainFollowupJobCreationFailedException. Enabling both causes the aforementioned error.
I believe the error stems from the fact that dlt_id is written during normalization, but the schema that is generated ( in pq_stream_with_new_columns()) specifies it as string not null.
@turtleDev just to confirm (before we classify that as bug and get if fixed):
- you get this exception when running the original script included in bug report (but with dlt id enabled)
- you run it in a clean environment (ie. by including dev_mode=True on the pipeline) this will help us a lot to reproduce problem locally
Yeah, I can confirm @rudolfix.
I have the same issue. Using the pyarrow backend going from PGSQL -> BQ and enabling this in the config
[normalize.parquet_normalizer]
add_dlt_load_id = true
add_dlt_id = true
will lead to
<class 'dlt.load.exceptions.TableChainFollowupJobCreationFailedException'>
Failed creating table chain followup jobs for table chain with root table prosumer.
I'm using the merge/scd2 strategy.
Using the merge strategy without scd2 works.
@turtleDev exception in normalize looks like a bug. we'll investigate.
do you have a primary key on your test data? then please declare it in the resource (ie. on id) and the problem will be gone. the reason it works on duckdb and not on clickhouse: we are using temp table to delete retired records on clickhouse (AFAIK we could not do DELETE with JOIN)
@abuchmueller your problem may be related to this: https://dlthub.com/docs/general-usage/incremental-loading#-use-scd2-with-arrow-tables-and-panda-frames
right now dlt does not add row hash automatically to arrow tables / data frames. you need to use map function yourself
bug info: investigate why normalization fails here: https://github.com/dlt-hub/dlt/issues/2248#issuecomment-2648945960
@rudolfix I'm having this same problem with sqlserver destination, apparently, it doesn't create the root_key on the new table of an existing incremental load (even with root_key = true)
The reason is: No
root_keycolumn (e.g._dlt_root_id) in table
Also looks like this issue #2088 is still present related to indexes and that could be part of the flow problem
I while ago, I tried to understand the code flow to help fixing it, but it was a little hard, do you have any documents or hints on how to understand the whole process, or maybe an already existing test that follows it that you recommend to analize?
Hi @rudolfix, I realized one important hint that could help pinpoint this at least for this case, and I tried without look, my problematic column, for example root_table__child_array on the example corrupted schema, it's an empty list, if I replace the empty list with None or just delete de column the problem disappears, but, this column can indeed have data, it's just whole loads came empty:
I tried to reproduce with tests but again, no look, the only way was intentionally corrupting the schema or doing an append load first without root_key (empty list problematic column) and then a merge.
{
"name": "default",
"version": 1,
"engine_version": "1.0.0",
"tables": {
"root_table": {
"name": "root_table",
"write_disposition": "merge",
"columns": {
"id": {
"name": "id",
"data_type": "text",
"nullable": false,
"primary_key": true
},
"title": {
"name": "title",
"data_type": "text",
"nullable": true
},
"_dlt_id": {
"name": "_dlt_id",
"data_type": "text",
"nullable": false,
"unique": true,
"row_key": true
},
"_dlt_load_id": {
"name": "_dlt_load_id",
"data_type": "text",
"nullable": false
}
},
"x-normalizer": {
"seen-data": true
}
},
"root_table__child_array": {
"name": "root_table__child_array",
"parent": "root_table",
"write_disposition": "merge",
"columns": {},
"x-normalizer": {
"seen-data": true
}
}
},
"settings": {}
}
Note: I'm currently testing with a custom normalizer like this and it's working for now, hope it stays like that:
def _normalize_problem_columns(doc: Dict): # type: ignore
for col in c.columns_to_normalize or ():
if col not in doc:
continue
val = doc.get(col)
# Remove empty lists entirely -> prevents creation of empty child table with no columns
if isinstance(val, list):
if len(val) == 0:
del doc[col]
continue
# If list of dicts but all dicts empty, drop field
if all(isinstance(it, dict) and len(it) == 0 for it in val):
del doc[col]
continue
# If value is None or empty string just remove
if val in (None, ""):
del doc[col]
return doc