[destination-clickhouse v2] Failing With IllegalStateException
Connector Name
destination-clickhouse
Connector Version
2.1.14
What step the error happened?
During the sync
Relevant information
Syncing a large table from MSSQL to ClickHouse using the latest connector version does not finalize correctly. The table has 335,842,447 records in total, and specifying the mapper configuration seems not to have an effect at the ingestion time (I don't see the filter being applied at source-query time, however I see only the filtered query in the destination).
Airbyte actually completes the sync, but it fails in returning a successful status code, so it retries 5 times after giving up, with all the records being extracted and written to the destination, but giving an erroneous status (0 records) in the tab.
Relevant log output
2025-11-20 14:34:26 replication-orchestrator INFO Total records read: 85000 (30 MB)
2025-11-20 14:34:26 replication-orchestrator INFO Schema validation was performed to a max of 10 records with errors per stream.
2025-11-20 14:34:26 replication-orchestrator WARN Schema validation errors found for stream assessment_Question. Error messages: [$.BoxHeightInLines: null found, number expected, $.ExpectedAnswerType: null found, string expected, $.QuestionText: null found, string expected, $.MarkingCriteria: null found, string expected, $.SyllabusConstruct: null found, string expected]
2025-11-20 14:34:26 replication-orchestrator INFO readFromSource: done. (source.isFinished:true, fromSource.isClosed:false)
2025-11-20 14:34:26 replication-orchestrator INFO thread status... heartbeat thread: false , replication thread: true
2025-11-20 14:34:31 replication-orchestrator INFO processMessage: done. (fromSource.isDone:true, forDest.isClosed:false)
2025-11-20 14:34:36 replication-orchestrator INFO writeToDestination: done. (forDest.isDone:true, isDestRunning:true)
2025-11-20 14:34:36 replication-orchestrator INFO thread status... timeout thread: false , replication thread: true
2025-11-20 14:34:36 destination INFO DefaultDispatcher-worker-2 i.a.c.l.d.i.JsonDestinationMessageInputFlow$collect$4(invokeSuspend):47 Input stream reader closed.
2025-11-20 14:34:36 destination INFO pool-2-thread-1 i.a.c.l.d.i.JsonDestinationMessageInputFlow(collect):51 Finished reading input.
2025-11-20 14:34:36 destination INFO pool-2-thread-1 i.a.c.l.d.i.DataFlowPipelineInputFlow(collect):58 Finished routing input.
2025-11-20 14:34:36 destination INFO DefaultDispatcher-worker-2 i.a.c.l.d.p.PipelineCompletionHandler$apply$2(invokeSuspend):31 Flushing 1 final aggregates...
2025-11-20 14:34:36 destination INFO DefaultDispatcher-worker-2 i.a.i.d.c.w.l.BinaryRowInsertBuffer(flush):59 Beginning insert into Question
2025-11-20 14:34:36 destination INFO DefaultDispatcher-worker-2 i.a.i.d.c.w.l.BinaryRowInsertBuffer(flush):70 Finished insert of 21105 rows into Question
2025-11-20 14:34:36 destination INFO DefaultDispatcher-worker-2 i.a.c.l.d.p.PipelineCompletionHandler$apply$2(invokeSuspend):46 Final aggregates flushed.
2025-11-20 14:34:36 destination INFO main i.a.c.l.d.p.PipelineRunner(run):47 Individual pipelines complete...
2025-11-20 14:34:36 destination INFO main i.a.c.l.d.p.PipelineRunner(run):51 Disabling state reconciler...
2025-11-20 14:34:36 destination INFO main i.a.c.l.d.p.PipelineRunner(run):64 Flushing final states...
2025-11-20 14:34:36 destination INFO main i.a.c.l.d.s.StateStore(logStateInfo):180 State diagnostic information [STREAM] (by descriptor):
- descriptor=Descriptor(namespace=UAT_DB_02_Assessments_GL_TenantId_1, name=Question) first key: StateKey(id=1, partitionKeys=[PartitionKey(id=UAT_DB_02_Assessments_GL_TenantId_1-Question-1)]) (state: StreamCheckpoint(checkpoint=Checkpoint(unmappedNamespace=UAT_DB_02_Assessments_GL_TenantId_1, unmappedName=Question, state={"version":2,"state_type":"ordered_column","ordered_col":"QuestionId","ordered_col_val":"124995","incremental_state":{"state_type":"cursor_based","version":2,"stream_name":"Question","stream_namespace":"assessment","cursor_field":["ModifiedTimestamp"],"cursor":"2025-11-20T07:42:49.336906Z","cursor_record_count":1}}, additionalProperties={}), sourceStats=Stats(recordCount=10000, rejectedRecordCount=0, additionalStats={}), destinationStats=null, additionalProperties={id=2}, serializedSizeBytes=520, checkpointKey=null, totalRecords=null, totalBytes=null, totalRejectedRecords=null, additionalStats={}))
• Incomplete: expectedCount 10000.0 does not equal flushedCount 5735.0 (by partition: [5735.0])
2025-11-20 14:34:36 destination ERROR main i.a.c.AirbyteConnectorRunnable(run):38 Failed class io.airbyte.integrations.destination.clickhouse.cdk.WriteOperationV2 operation execution. java.lang.IllegalStateException: Sync completed, but unflushed states were detected.
at io.airbyte.cdk.load.dataflow.pipeline.PipelineRunner.run(PipelineRunner.kt:69) ~[bulk-cdk-core-load-0.1.78.jar:?]
at io.airbyte.cdk.load.dataflow.pipeline.PipelineRunner$run$1.invokeSuspend(PipelineRunner.kt) ~[bulk-cdk-core-load-0.1.78.jar:?]
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33) ~[kotlin-stdlib-2.1.10.jar:2.1.10-release-473]
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:100) ~[kotlinx-coroutines-core-jvm-1.10.2.jar:?]
at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:263) ~[kotlinx-coroutines-core-jvm-1.10.2.jar:?]
at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:94) ~[kotlinx-coroutines-core-jvm-1.10.2.jar:?]
at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:70) ~[kotlinx-coroutines-core-jvm-1.10.2.jar:?]
at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source) ~[kotlinx-coroutines-core-jvm-1.10.2.jar:?]
at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:48) ~[kotlinx-coroutines-core-jvm-1.10.2.jar:?]
at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source) ~[kotlinx-coroutines-core-jvm-1.10.2.jar:?]
at io.airbyte.cdk.load.dataflow.DestinationLifecycle.run(DestinationLifecycle.kt:41) ~[bulk-cdk-core-load-0.1.78.jar:?]
at io.airbyte.integrations.destination.clickhouse.cdk.WriteOperationV2.execute(WriteOperationV2.kt:24) ~[io.airbyte.airbyte-integrations.connectors-destination-clickhouse.jar:?]
at io.airbyte.cdk.AirbyteConnectorRunnable.run(AirbyteConnectorRunnable.kt:36) [bulk-cdk-core-base-0.1.78.jar:?]
at picocli.CommandLine.executeUserObject(CommandLine.java:2030) [picocli-4.7.6.jar:4.7.6]
at picocli.CommandLine.access$1500(CommandLine.java:148) [picocli-4.7.6.jar:4.7.6]
at picocli.CommandLine$RunLast.executeUserObjectOfLastSubcommandWithSameParent(CommandLine.java:2465) [picocli-4.7.6.jar:4.7.6]
at picocli.CommandLine$RunLast.handle(CommandLine.java:2457) [picocli-4.7.6.jar:4.7.6]
at picocli.CommandLine$RunLast.handle(CommandLine.java:2419) [picocli-4.7.6.jar:4.7.6]
at picocli.CommandLine$AbstractParseResultHandler.execute(CommandLine.java:2277) [picocli-4.7.6.jar:4.7.6]
at picocli.CommandLine$RunLast.execute(CommandLine.java:2421) [picocli-4.7.6.jar:4.7.6]
at picocli.CommandLine.execute(CommandLine.java:2174) [picocli-4.7.6.jar:4.7.6]
at io.airbyte.cdk.AirbyteDestinationRunner$Companion.run(AirbyteConnectorRunner.kt:289) [bulk-cdk-core-base-0.1.78.jar:?]
at io.airbyte.cdk.AirbyteDestinationRunner$Companion.run$default(AirbyteConnectorRunner.kt:75) [bulk-cdk-core-base-0.1.78.jar:?]
at io.airbyte.integrations.destination.clickhouse.ClickhouseDestinationKt.main(ClickhouseDestination.kt:10) [io.airbyte.airbyte-integrations.connectors-destination-clickhouse.jar:?]
Stack Trace: java.lang.IllegalStateException: Sync completed, but unflushed states were detected.
at io.airbyte.cdk.load.dataflow.pipeline.PipelineRunner.run(PipelineRunner.kt:69)
at io.airbyte.cdk.load.dataflow.pipeline.PipelineRunner$run$1.invokeSuspend(PipelineRunner.kt)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:100)
at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:263)
at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:94)
at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:70)
at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)
at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:48)
at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)
at io.airbyte.cdk.load.dataflow.DestinationLifecycle.run(DestinationLifecycle.kt:41)
at io.airbyte.integrations.destination.clickhouse.cdk.WriteOperationV2.execute(WriteOperationV2.kt:24)
at io.airbyte.cdk.AirbyteConnectorRunnable.run(AirbyteConnectorRunnable.kt:36)
at picocli.CommandLine.executeUserObject(CommandLine.java:2030)
at picocli.CommandLine.access$1500(CommandLine.java:148)
at picocli.CommandLine$RunLast.executeUserObjectOfLastSubcommandWithSameParent(CommandLine.java:2465)
at picocli.CommandLine$RunLast.handle(CommandLine.java:2457)
at picocli.CommandLine$RunLast.handle(CommandLine.java:2419)
at picocli.CommandLine$AbstractParseResultHandler.execute(CommandLine.java:2277)
at picocli.CommandLine$RunLast.execute(CommandLine.java:2421)
at picocli.CommandLine.execute(CommandLine.java:2174)
at io.airbyte.cdk.AirbyteDestinationRunner$Companion.run(AirbyteConnectorRunner.kt:289)
at io.airbyte.cdk.AirbyteDestinationRunner$Companion.run$default(AirbyteConnectorRunner.kt:75)
at io.airbyte.integrations.destination.clickhouse.ClickhouseDestinationKt.main(ClickhouseDestination.kt:10)
2025-11-20 14:34:36 destination INFO main i.a.c.AirbyteConnectorRunnable(run):47 Flushing output consumer prior to shutdown.
2025-11-20 14:34:36 destination INFO main i.a.c.AirbyteConnectorRunnable(run):49 Completed integration: airbyte/destination-clickhouse.
2025-11-20 14:34:36 destination ERROR java.lang.IllegalStateException: Sync completed, but unflushed states were detected.
2025-11-20 14:34:36 destination ERROR at io.airbyte.cdk.load.dataflow.pipeline.PipelineRunner.run(PipelineRunner.kt:69)
2025-11-20 14:34:36 destination ERROR at io.airbyte.cdk.load.dataflow.pipeline.PipelineRunner$run$1.invokeSuspend(PipelineRunner.kt)
2025-11-20 14:34:36 destination ERROR at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
2025-11-20 14:34:36 destination ERROR at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:100)
2025-11-20 14:34:36 destination ERROR at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:263)
2025-11-20 14:34:36 destination ERROR at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:94)
2025-11-20 14:34:36 destination ERROR at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:70)
2025-11-20 14:34:36 destination ERROR at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)
2025-11-20 14:34:36 destination ERROR at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:48)
2025-11-20 14:34:36 destination ERROR at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)
2025-11-20 14:34:36 destination ERROR at io.airbyte.cdk.load.dataflow.DestinationLifecycle.run(DestinationLifecycle.kt:41)
2025-11-20 14:34:36 destination ERROR at io.airbyte.integrations.destination.clickhouse.cdk.WriteOperationV2.execute(WriteOperationV2.kt:24)
2025-11-20 14:34:36 destination ERROR at io.airbyte.cdk.AirbyteConnectorRunnable.run(AirbyteConnectorRunnable.kt:36)
2025-11-20 14:34:36 destination ERROR at picocli.CommandLine.executeUserObject(CommandLine.java:2030)
2025-11-20 14:34:36 destination ERROR at picocli.CommandLine.access$1500(CommandLine.java:148)
2025-11-20 14:34:36 destination ERROR at picocli.CommandLine$RunLast.executeUserObjectOfLastSubcommandWithSameParent(CommandLine.java:2465)
2025-11-20 14:34:36 destination ERROR at picocli.CommandLine$RunLast.handle(CommandLine.java:2457)
2025-11-20 14:34:36 destination ERROR at picocli.CommandLine$RunLast.handle(CommandLine.java:2419)
2025-11-20 14:34:36 destination ERROR at picocli.CommandLine$AbstractParseResultHandler.execute(CommandLine.java:2277)
2025-11-20 14:34:36 destination ERROR at picocli.CommandLine$RunLast.execute(CommandLine.java:2421)
2025-11-20 14:34:36 destination ERROR at picocli.CommandLine.execute(CommandLine.java:2174)
2025-11-20 14:34:36 destination ERROR at io.airbyte.cdk.AirbyteDestinationRunner$Companion.run(AirbyteConnectorRunner.kt:289)
2025-11-20 14:34:36 destination ERROR at io.airbyte.cdk.AirbyteDestinationRunner$Companion.run$default(AirbyteConnectorRunner.kt:75)
2025-11-20 14:34:36 destination ERROR at io.airbyte.integrations.destination.clickhouse.ClickhouseDestinationKt.main(ClickhouseDestination.kt:10)
2025-11-20 14:34:36 replication-orchestrator INFO Destination finished successfully — exiting read dest...
2025-11-20 14:34:36 replication-orchestrator INFO readFromDestination: exception caught
2025-11-20 14:34:36 replication-orchestrator INFO readFromDestination: done. (writeToDestFailed:false, dest.isFinished:true)
2025-11-20 14:34:36 replication-orchestrator INFO Closing StateCheckSumCountEventHandler
Contribute
- [x] Yes, I want to contribute
Additional Report: Snowflake Destination with MS SQL Server CDC Source
This same issue is affecting the Snowflake destination (v4.0.30) with MS SQL Server source (v4.3.0-rc.8) using CDC replication.
Customer Impact
- Workspace ID:
cfe2edcd-1e17-4007-bf42-0e43423da6cd - Connection ID:
a19bf30e-fbdd-4bc5-ba7b-cd647e666b14 - Job IDs: 58346167, 58362556
- Connector versions: MS SQL Server v4.3.0-rc.8, Snowflake v4.0.30
- Bulk CDK version: 0.1.78
Behavior
The sync completes data writes to Snowflake but fails with the same IllegalStateException: Sync completed, but unflushed states were detected error. Because the state is not emitted, the CDC cursor does not advance, causing the next sync to re-read and re-process data from the previous checkpoint.
This results in:
- Duplicate data processing and billing for rows that were already synced
- Customer perceives this as "Airbyte lost its cursor state"
Log Evidence
State diagnostic information [GLOBAL]:
First key: StateKey(id=1, partitionKeys=[PartitionKey(id=rsHA)]) (state: GlobalCheckpoint...)
The log shows data was written to Snowflake (~5,522 rows inserted) but the global CDC state was never marked as complete.
Root Cause Analysis
This appears to be a bug in the bulk CDK's state reconciliation logic where the expected record count (from sourceStats.recordCount) does not match the flushed count tracked by StateHistogramStore. This affects multiple destinations using the bulk CDK (ClickHouse, Snowflake) and is not specific to any particular source connector.
Reported on behalf of [email protected] (@Airbyte-Support) via Devin session
anyone that can help here? @airbyte-jenny