clickhouse-kafka-connect icon indicating copy to clipboard operation
clickhouse-kafka-connect copied to clipboard

If a UUID with an empty string is error exception

Open DmitryTuryshev opened this issue 1 year ago • 5 comments

https://github.com/ClickHouse/clickhouse-kafka-connect/blob/fef6bb714d2cca61bead75f16dd9c09ea215b9c5/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java#L611

Example debezium record here trigger_id_uuid is empty, but not null :

{"trigger_id_uuid":"", "more_":"...", "id":"123",
__deleted":{"string":"true"},
"__ts_ms":{"long":111111111},
"__op":{"string":"d"}}

Error trace:

WorkerSinkTask{id=clickhouse_sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Number of records: 1000 (org.apache.kafka.connect.runtime.WorkerSinkTask:633)
log_1: java.lang.RuntimeException: Number of records: 1000
log_1:         at com.clickhouse.kafka.connect.util.Utils.handleException(Utils.java:126)
log_1:         at com.clickhouse.kafka.connect.sink.ClickHouseSinkTask.put(ClickHouseSinkTask.java:71)
log_1:         at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:605)
log_1:         at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:344)
log_1:         at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:246)
log_1:         at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:215)
log_1:         at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:225)
log_1:         at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:280)
log_1:         at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
log_1:         at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
log_1:         at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
log_1:         at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
log_1:         at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
log_1:         at java.base/java.lang.Thread.run(Thread.java:840)
log_1: Caused by: java.lang.RuntimeException: Topic: [zoom_trigger_trigger_users], Partition: [0], MinOffset: [58743], MaxOffset: [58745], (QueryId: [uuuui-uuuu-490f-bc2c-1uuuub2])
log_1:         at com.clickhouse.kafka.connect.sink.processing.Processing.doInsert(Processing.java:62)
log_1:         at com.clickhouse.kafka.connect.sink.processing.Processing.doLogic(Processing.java:196)
log_1:         at com.clickhouse.kafka.connect.sink.ProxySinkTask.put(ProxySinkTask.java:99)
log_1:         at com.clickhouse.kafka.connect.sink.ClickHouseSinkTask.put(ClickHouseSinkTask.java:65)
log_1:         ... 12 more
**log_1: Caused by: java.lang.IllegalArgumentException: Invalid UUID string:
log_1:         at java.base/java.util.UUID.fromString1(UUID.java:280)
log_1:         at java.base/java.util.UUID.fromString(UUID.java:258)**
log_1:         at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doWritePrimitive(ClickHouseWriter.java:605)
log_1:         at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doWriteColValue(ClickHouseWriter.java:403)
log_1:         at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doWriteCol(ClickHouseWriter.java:661)
log_1:         at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doInsertRawBinaryV1(ClickHouseWriter.java:774)
log_1:         at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doInsertRawBinary(ClickHouseWriter.java:683)
log_1:         at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doInsert(ClickHouseWriter.java:197)
log_1:         at com.clickhouse.kafka.connect.sink.processing.Processing.doInsert(Processing.java:60)
log_1:         ... 15 more

DmitryTuryshev avatar Nov 26 '24 14:11 DmitryTuryshev

case UUID:
  if (value instanceof String && ((String) value).isEmpty()) {
      // Write a zero UUID (all bits set to zero)
      UUID zeroUUID = new UUID(0L, 0L);
      BinaryStreamUtils.writeUuid(stream, zeroUUID);
      LOGGER.debug("Written zero UUID for empty string value.");
  } else {
      BinaryStreamUtils.writeUuid(stream, UUID.fromString((String) value));
  }
  break;

I just replaced this case on the above. The built project works well. https://github.com/ClickHouse/clickhouse-kafka-connect/blob/fef6bb714d2cca61bead75f16dd9c09ea215b9c5/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java#L611

Please give me advice if I forgot something.

DmitryTuryshev avatar Nov 27 '24 05:11 DmitryTuryshev

@DmitryTuryshev Hi! Does the UUID field in the database schema have a Default or Nullable value?

Paultagoras avatar Dec 04 '24 07:12 Paultagoras

@Paultagoras Hi, there! Thx for your question!

schema-registry

"name": "trigger_id",
"type": {
  "type": "string",
  "connect.version": 1,
  "connect.name": "io.debezium.data.Uuid"
}

source db postgres has table schema :

....
trigger_id uuid NOT NULL
....

Also, there is a delete event for this record. It means just "" getting by debezium

DmitryTuryshev avatar Dec 04 '24 07:12 DmitryTuryshev

@DmitryTuryshev Apologies, I meant in the destination table.

Paultagoras avatar Dec 05 '24 22:12 Paultagoras

NP, the field is in the destination table. ``trigger_idNullable(UUID) CODEC(ZSTD(1))

Also, I'm sure this problem is not in the destination table, because there is an error with decoding "" to UUID

DmitryTuryshev avatar Dec 06 '24 04:12 DmitryTuryshev