clickhouse-sink-connector
clickhouse-sink-connector copied to clipboard
Can't load table from Postgres to Clickhouse containing nullable numeric column
I have setup where source postgres table has nullable numeric column (deposit_amount
). DDL is:
CREATE TABLE reservations.guarantee (
id int4 GENERATED BY DEFAULT AS IDENTITY( INCREMENT BY 1 MINVALUE 1 MAXVALUE 2147483647 START 1 CACHE 1 NO CYCLE) NOT NULL,
code public."citext" NOT NULL,
"name" public."citext" NOT NULL,
is_active bool NOT NULL,
color_hex public."citext" NULL,
guarantee_type text NOT NULL,
guarantees_accommodation bool NOT NULL,
auto_cancellation bool NOT NULL,
deposit_unit text NULL,
deposit_amount numeric NULL,
prepayment_countdown text NOT NULL,
prepayment_days int4 NULL,
user_created_id int4 NULL,
user_deleted_id int4 NULL,
date_created timestamptz NOT NULL,
date_modified timestamptz NOT NULL,
date_deleted timestamptz NULL,
CONSTRAINT pk_guarantee PRIMARY KEY (id)
);
Also I've configured sink connector via AVRO/Schemas to move data into Clickhouse. But clickhouse sink (2.0.1-kafka) gives error when moving data:
sink | 2024-03-26 22:56:10,489 INFO || *************** EXECUTED BATCH Successfully Records: 1************** task(0) Thread ID: Sink Connector thread-pool-1 Result: [I@dab125 [com.altinity.clickhouse.sink.connector.db.batch.PreparedStatementExecutor]
sink | 2024-03-26 22:56:10,499 INFO || ResultSetcom.clickhouse.jdbc.ClickHouseResultSet@b863e29 [com.altinity.clickhouse.sink.connector.db.DBMetadata]
sink | 2024-03-26 22:56:10,528 INFO || *** QUERY***insert into `guarantee`(`id`,`code`,`name`,`is_active`,`color_hex`,`guarantee_type`,`guarantees_accommodation`,`auto_cancellation`,`deposit_unit`,`deposit_amount`,`prepayment_countdown`,`prepayment_days`,`user_created_id`,`user_deleted_id`,`date_created`,`date_modified`,`date_deleted`,`_sign`,`_version`) select `id`,`code`,`name`,`is_active`,`color_hex`,`guarantee_type`,`guarantees_accommodation`,`auto_cancellation`,`deposit_unit`,`deposit_amount`,`prepayment_countdown`,`prepayment_days`,`user_created_id`,`user_deleted_id`,`date_created`,`date_modified`,`date_deleted`,`_sign`,`_version` from input('`id` Int32,`code` String,`name` String,`is_active` Bool,`color_hex` Nullable(String),`guarantee_type` String,`guarantees_accommodation` Bool,`auto_cancellation` Bool,`deposit_unit` Nullable(String),`deposit_amount` Nullable(Decimal(10, 2)),`prepayment_countdown` String,`prepayment_days` Nullable(Int32),`user_created_id` Nullable(Int32),`user_deleted_id` Nullable(Int32),`date_created` String,`date_modified` String,`date_deleted` Nullable(String),`_sign` Int8,`_version` UInt64') [com.altinity.clickhouse.sink.connector.db.batch.PreparedStatementExecutor]
sink | 2024-03-26 22:56:10,536 ERROR || ******* ERROR inserting Batch ***************** [com.altinity.clickhouse.sink.connector.db.batch.PreparedStatementExecutor]
sink | java.lang.ClassCastException: class java.nio.HeapByteBuffer cannot be cast to class [B (java.nio.HeapByteBuffer and [B are in module java.base of loader 'bootstrap')
sink | at com.altinity.clickhouse.sink.connector.converters.ClickHouseDataTypeMapper.convert(ClickHouseDataTypeMapper.java:245)
sink | at com.altinity.clickhouse.sink.connector.db.batch.PreparedStatementExecutor.insertPreparedStatement(PreparedStatementExecutor.java:264)
sink | at com.altinity.clickhouse.sink.connector.db.batch.PreparedStatementExecutor.lambda$executePreparedStatement$0(PreparedStatementExecutor.java:139)
sink | at java.base/java.lang.Iterable.forEach(Iterable.java:75)
sink | at com.altinity.clickhouse.sink.connector.db.batch.PreparedStatementExecutor.executePreparedStatement(PreparedStatementExecutor.java:117)
sink | at com.altinity.clickhouse.sink.connector.db.batch.PreparedStatementExecutor.addToPreparedStatementBatch(PreparedStatementExecutor.java:88)
sink | at com.altinity.clickhouse.sink.connector.executor.ClickHouseBatchRunnable.flushRecordsToClickHouse(ClickHouseBatchRunnable.java:323)
sink | at com.altinity.clickhouse.sink.connector.executor.ClickHouseBatchRunnable.processRecordsByTopic(ClickHouseBatchRunnable.java:289)
sink | at com.altinity.clickhouse.sink.connector.executor.ClickHouseBatchRunnable.run(ClickHouseBatchRunnable.java:151)
sink | at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
sink | at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
sink | at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
sink | at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
sink | at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
sink | at java.base/java.lang.Thread.run(Thread.java:829)
AVRO schema for the deposit_amount
column is:
{
"name": "deposit_amount",
"type": [
"null",
{
"type": "record",
"name": "VariableScaleDecimal",
"namespace": "io.debezium.data",
"fields": [
{
"name": "scale",
"type": "int"
},
{
"name": "value",
"type": "bytes"
}
],
"connect.doc": "Variable scaled decimal",
"connect.version": 1,
"connect.name": "io.debezium.data.VariableScaleDecimal"
}
],
"default": null
},
As I see from stacktrace the problem is in moving column of decimal type.
Message content is:
"deposit_amount":{
"io.debezium.data.VariableScaleDecimal":{
"scale":0
"value":"AA=="
}
}
After changing key/value serializers to JSON problem gone away. So the reason somewhere in AVRO as I can understand