debezium-server-iceberg
debezium-server-iceberg copied to clipboard
Upsert doesn't work when the destination iceberg table is partitioned
I'm trying to cdc data in upsert mode from Postgres. I notice when I partition the iceberg table by a column present in the source table, new records are appended instead of upserted.
Here is the source table's data initially:
The data is replicated normally to Iceberg:
However, when I update the source table to:
Instead of updating the existing record, a new record is added to the Iceberg table:
The table definitions are as follows:
-- src table
CREATE TABLE test (
id int4 NOT NULL,
"name" varchar(50) NULL,
CONSTRAINT test_pkey PRIMARY KEY (id)
);
-- dest table
CREATE TABLE iceberg_test (
id integer NOT NULL,
name varchar,
__op varchar,
__source_ts_ms timestamp(6) with time zone,
__deleted varchar
)
WITH (
format = 'PARQUET',
format_version = 2,
partitioning = array['name'],
sorted_by = ARRAY['id ASC NULLS FIRST']
);
This is my application.properties file:
# Use iceberg sink
debezium.sink.type=iceberg
# Iceberg sink config
debezium.sink.iceberg.table-prefix=postgres121
debezium.sink.iceberg.upsert=true
debezium.sink.iceberg.upsert-keep-deletes=false
debezium.sink.iceberg.write.format.default=parquet
debezium.sink.iceberg.catalog-name=iceberg
# hive meatastore catalogs
debezium.sink.iceberg.type=hive
debezium.sink.iceberg.uri=thrift://SOME_URI
debezium.sink.iceberg.clients=5
debezium.sink.iceberg.warehouse=s3a://datalake
debezium.sink.iceberg.catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO
debezium.sink.iceberg.engine.hive.enabled=true
debezium.sink.iceberg.iceberg.engine.hive.enabled=true
debezium.sink.iceberg.hive.metastore.table.owner=admin
debezium.sink.iceberg.hive.other.configs=admin
# S3 config
debezium.sink.iceberg.fs.defaultFS=s3a://datalake
debezium.sink.iceberg.com.amazonaws.services.s3.enableV4=true
debezium.sink.iceberg.com.amazonaws.services.s3a.enableV4=true
debezium.sink.iceberg.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
debezium.sink.iceberg.fs.s3a.access.key=minioadmin
debezium.sink.iceberg.fs.s3a.secret.key=minioadmin
debezium.sink.iceberg.fs.s3a.endpoint=http://SOME_ENDPOINT
debezium.sink.iceberg.fs.s3a.path.style.access=true
debezium.sink.iceberg.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem
# enable event schemas - mandate
debezium.format.value.schemas.enable=true
debezium.format.key.schemas.enable=true
debezium.format.value=json
debezium.format.key=json
# postgres
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.offset.storage.file.filename=/tmp/offset
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=SOME_HOSTNAME
debezium.source.database.port=5431
debezium.source.database.user=postgres
debezium.source.database.password=changeme
debezium.source.database.dbname=test
debezium.source.database.server.name=postgres_test
debezium.source.schema.include.list=kafka_hudi_test
#debezium.source.database.server.name=postgres_test1
#debezium.source.table.include.list=public.test_user1
debezium.source.topic.prefix=vietpq_
debezium.source.plugin.name=pgoutput
debezium.source.slot.name=ducdn1
# do event flattening. unwrap message!
debezium.transforms=unwrap
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
debezium.transforms.unwrap.add.fields=op,source.ts_ms
debezium.transforms.unwrap.delete.handling.mode=rewrite
debezium.transforms.unwrap.drop.tombstones=true
# ############ SET INTERVAL TIME ############
debezium.sink.batch.batch-size-wait=MaxBatchSizeWait
debezium.sink.batch.metrics.snapshot-mbean=debezium.postgres:type=connector-metrics,context=snapshot,server=testc
debezium.sink.batch.metrics.streaming-mbean=debezium.postgres:type=connector-metrics,context=streaming,server=testc
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.max.batch.size=100000
debezium.source.max.queue.size=1000000
debezium.sink.batch.batch-size-wait.max-wait-ms=6000
debezium.sink.batch.batch-size-wait.wait-interval-ms=1000
# ############ SET LOG LEVELS ############
quarkus.log.level=INFO
quarkus.log.console.json=false
# hadoop, parquet
quarkus.log.category."org.apache.hadoop".level=WARN
quarkus.log.category."org.apache.parquet".level=WARN
# Ignore messages below warning level from Jetty, because it's a bit verbose
quarkus.log.category."org.eclipse.jetty".level=WARN
quarkus.log.category."io.debezium.server.iceberg.batchsizewait".level=DEBUG
The expected behaviour is there should only be 1 record in the destination table containing the updated data. When the destination table is not partitioned by the "name" column, upsert works fine.
@vietcheems thank you for reporting it.
when you are creating destination table, could it be that the table is created without "identifier field"? application does falls back to append mode when target table doesn't have "identifier field" defined.
if thats not the case, could it be that the debezium application still thinks the table is not partitioned? to test it
- replicate first row
- stop the replication:
- turn table partitioned table
- start the replication
- do upsert
- check result
Could you try with adding identifier field after creating the table ALTER TABLE iceberg_test SET IDENTIFIER FIELDS id
https://iceberg.apache.org/docs/latest/spark-ddl/#alter-table--set-identifier-fields
I'm using Trino and there isn't an option to set an identifier field as far as I'm concerned. However, using Trino, when I create the table without the partitioning spec (remove partitioning = array['name']), upsert still works as usual. So I don't know if "identifier field" is the issue.
Could you try with adding identifier field after creating the table
ALTER TABLE iceberg_test SET IDENTIFIER FIELDS idhttps://iceberg.apache.org/docs/latest/spark-ddl/#alter-table--set-identifier-fields
I switched to spark to try this and the error still persists.
@vietcheems Thank you for cheeking it, it is related to using two different partitions between the previous row(name=1) and new row (name=deleted). This means partitioning field should be immutable(should not change between old row and new row/upsert)
DOC:https://iceberg.apache.org/spec/#scan-planning
An equality delete file must be applied to a data file when all of the following are true:
The data file’s partition (both spec and partition values) is equal to the delete file’s partition or the delete file’s partition spec is unpartitioned
currently upsert without changing the name field(partition field) should work for partitioned tables.
We could try to change the deletion to be global delete. apply/save deletion with unpartitioned spec
In general, deletes are applied only to data files that are older and in the same partition, except for two special cases: Equality delete files stored with an unpartitioned spec are applied as global deletes. Otherwise, delete files do not apply to files in other partitions.