clickhouse-sink-connector
clickhouse-sink-connector copied to clipboard
sink connector client - show_replica_status not working when the connector is stopped.
Steps to reproduce:
- Use the sink connector CLI to stop when its processing records.
- This exception seems to cause the sink connector not to persist the offsets until its restarted.
- ./sink-connector-client show_replica_status gives empty results and the sink connector does not increment offsets.
bash-4.4# ./sink-connector-client stop_replica 2024/04/01 17:20:53 ***** Stopping replication..... ***** 2024/04/01 17:21:03 2024/04/01 17:21:03 ***** Replication stopped successfully ***** java.lang.NullPointerException: Cannot invoke "java.util.concurrent.ExecutorService.submit(java.util.concurrent.Callable)" because "this.executor" is null at io.debezium.storage.jdbc.offset.JdbcOffsetBackingStore.set(JdbcOffsetBackingStore.java:194) at org.apache.kafka.connect.storage.OffsetStorageWriter.doFlush(OffsetStorageWriter.java:199) at io.debezium.embedded.EmbeddedEngine.commitOffsets(EmbeddedEngine.java:905) at io.debezium.embedded.EmbeddedEngine.maybeFlush(EmbeddedEngine.java:888) at io.debezium.embedded.EmbeddedEngine$4.markBatchFinished(EmbeddedEngine.java:827) at io.debezium.embedded.ConvertingEngineBuilder$1.markBatchFinished(ConvertingEngineBuilder.java:113) at com.altinity.clickhouse.sink.connector.executor.DebeziumOffsetManagement.acknowledgeRecords(DebeziumOffsetManagement.java:143) at com.altinity.clickhouse.sink.connector.executor.DebeziumOffsetManagement.checkIfBatchCanBeCommitted(DebeziumOffsetManagement.java:111) at com.altinity.clickhouse.sink.connector.executor.ClickHouseBatchRunnable.run(ClickHouseBatchRunnable.java:159) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:833)
This seems to occur because the connection is closed and the check for null is not enough.
ClickHouseConnection
API has an isClosed()
function that can be used to check if the connection is open.
workaround in SQL
select * from altinity_sink_connector.show_replica_status\G