clickhouse-sink-connector icon indicating copy to clipboard operation
clickhouse-sink-connector copied to clipboard

Can't load table from Postgres to Clickhouse containing nullable numeric column

Open ZlobnyiSerg opened this issue 10 months ago • 1 comments

I have setup where source postgres table has nullable numeric column (deposit_amount). DDL is:

CREATE TABLE reservations.guarantee (
	id int4 GENERATED BY DEFAULT AS IDENTITY( INCREMENT BY 1 MINVALUE 1 MAXVALUE 2147483647 START 1 CACHE 1 NO CYCLE) NOT NULL,
	code public."citext" NOT NULL,
	"name" public."citext" NOT NULL,
	is_active bool NOT NULL,
	color_hex public."citext" NULL,
	guarantee_type text NOT NULL,
	guarantees_accommodation bool NOT NULL,
	auto_cancellation bool NOT NULL,
	deposit_unit text NULL,
	deposit_amount numeric NULL,
	prepayment_countdown text NOT NULL,
	prepayment_days int4 NULL,
	user_created_id int4 NULL,
	user_deleted_id int4 NULL,
	date_created timestamptz NOT NULL,
	date_modified timestamptz NOT NULL,
	date_deleted timestamptz NULL,
	CONSTRAINT pk_guarantee PRIMARY KEY (id)
);

Also I've configured sink connector via AVRO/Schemas to move data into Clickhouse. But clickhouse sink (2.0.1-kafka) gives error when moving data:

sink        | 2024-03-26 22:56:10,489 INFO   ||  *************** EXECUTED BATCH Successfully Records: 1************** task(0) Thread ID: Sink Connector thread-pool-1 Result: [I@dab125   [com.altinity.clickhouse.sink.connector.db.batch.PreparedStatementExecutor]
sink        | 2024-03-26 22:56:10,499 INFO   ||  ResultSetcom.clickhouse.jdbc.ClickHouseResultSet@b863e29   [com.altinity.clickhouse.sink.connector.db.DBMetadata]
sink        | 2024-03-26 22:56:10,528 INFO   ||  *** QUERY***insert into `guarantee`(`id`,`code`,`name`,`is_active`,`color_hex`,`guarantee_type`,`guarantees_accommodation`,`auto_cancellation`,`deposit_unit`,`deposit_amount`,`prepayment_countdown`,`prepayment_days`,`user_created_id`,`user_deleted_id`,`date_created`,`date_modified`,`date_deleted`,`_sign`,`_version`) select `id`,`code`,`name`,`is_active`,`color_hex`,`guarantee_type`,`guarantees_accommodation`,`auto_cancellation`,`deposit_unit`,`deposit_amount`,`prepayment_countdown`,`prepayment_days`,`user_created_id`,`user_deleted_id`,`date_created`,`date_modified`,`date_deleted`,`_sign`,`_version` from input('`id` Int32,`code` String,`name` String,`is_active` Bool,`color_hex` Nullable(String),`guarantee_type` String,`guarantees_accommodation` Bool,`auto_cancellation` Bool,`deposit_unit` Nullable(String),`deposit_amount` Nullable(Decimal(10, 2)),`prepayment_countdown` String,`prepayment_days` Nullable(Int32),`user_created_id` Nullable(Int32),`user_deleted_id` Nullable(Int32),`date_created` String,`date_modified` String,`date_deleted` Nullable(String),`_sign` Int8,`_version` UInt64')   [com.altinity.clickhouse.sink.connector.db.batch.PreparedStatementExecutor]
sink        | 2024-03-26 22:56:10,536 ERROR  ||  ******* ERROR inserting Batch *****************   [com.altinity.clickhouse.sink.connector.db.batch.PreparedStatementExecutor]
sink        | java.lang.ClassCastException: class java.nio.HeapByteBuffer cannot be cast to class [B (java.nio.HeapByteBuffer and [B are in module java.base of loader 'bootstrap')
sink        |   at com.altinity.clickhouse.sink.connector.converters.ClickHouseDataTypeMapper.convert(ClickHouseDataTypeMapper.java:245)
sink        |   at com.altinity.clickhouse.sink.connector.db.batch.PreparedStatementExecutor.insertPreparedStatement(PreparedStatementExecutor.java:264)
sink        |   at com.altinity.clickhouse.sink.connector.db.batch.PreparedStatementExecutor.lambda$executePreparedStatement$0(PreparedStatementExecutor.java:139)
sink        |   at java.base/java.lang.Iterable.forEach(Iterable.java:75)
sink        |   at com.altinity.clickhouse.sink.connector.db.batch.PreparedStatementExecutor.executePreparedStatement(PreparedStatementExecutor.java:117)
sink        |   at com.altinity.clickhouse.sink.connector.db.batch.PreparedStatementExecutor.addToPreparedStatementBatch(PreparedStatementExecutor.java:88)
sink        |   at com.altinity.clickhouse.sink.connector.executor.ClickHouseBatchRunnable.flushRecordsToClickHouse(ClickHouseBatchRunnable.java:323)
sink        |   at com.altinity.clickhouse.sink.connector.executor.ClickHouseBatchRunnable.processRecordsByTopic(ClickHouseBatchRunnable.java:289)
sink        |   at com.altinity.clickhouse.sink.connector.executor.ClickHouseBatchRunnable.run(ClickHouseBatchRunnable.java:151)
sink        |   at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
sink        |   at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
sink        |   at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
sink        |   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
sink        |   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
sink        |   at java.base/java.lang.Thread.run(Thread.java:829)

AVRO schema for the deposit_amount column is:

{
                            "name": "deposit_amount",
                            "type": [
                                "null",
                                {
                                    "type": "record",
                                    "name": "VariableScaleDecimal",
                                    "namespace": "io.debezium.data",
                                    "fields": [
                                        {
                                            "name": "scale",
                                            "type": "int"
                                        },
                                        {
                                            "name": "value",
                                            "type": "bytes"
                                        }
                                    ],
                                    "connect.doc": "Variable scaled decimal",
                                    "connect.version": 1,
                                    "connect.name": "io.debezium.data.VariableScaleDecimal"
                                }
                            ],
                            "default": null
                        },

As I see from stacktrace the problem is in moving column of decimal type.

Message content is:

"deposit_amount":{
"io.debezium.data.VariableScaleDecimal":{
"scale":0
"value":"AA=="
}
}

ZlobnyiSerg avatar Mar 26 '24 22:03 ZlobnyiSerg

After changing key/value serializers to JSON problem gone away. So the reason somewhere in AVRO as I can understand

ZlobnyiSerg avatar Mar 27 '24 09:03 ZlobnyiSerg