airbyte icon indicating copy to clipboard operation
airbyte copied to clipboard

[destination-clickhouse v2] Failing With IllegalStateException

Open babaMar opened this issue 1 month ago • 2 comments

Connector Name

destination-clickhouse

Connector Version

2.1.14

What step the error happened?

During the sync

Relevant information

Syncing a large table from MSSQL to ClickHouse using the latest connector version does not finalize correctly. The table has 335,842,447 records in total, and specifying the mapper configuration seems not to have an effect at the ingestion time (I don't see the filter being applied at source-query time, however I see only the filtered query in the destination).

Airbyte actually completes the sync, but it fails in returning a successful status code, so it retries 5 times after giving up, with all the records being extracted and written to the destination, but giving an erroneous status (0 records) in the tab.

Relevant log output

2025-11-20 14:34:26 replication-orchestrator INFO Total records read: 85000 (30 MB)
2025-11-20 14:34:26 replication-orchestrator INFO Schema validation was performed to a max of 10 records with errors per stream.
2025-11-20 14:34:26 replication-orchestrator WARN Schema validation errors found for stream assessment_Question. Error messages: [$.BoxHeightInLines: null found, number expected, $.ExpectedAnswerType: null found, string expected, $.QuestionText: null found, string expected, $.MarkingCriteria: null found, string expected, $.SyllabusConstruct: null found, string expected]
2025-11-20 14:34:26 replication-orchestrator INFO readFromSource: done. (source.isFinished:true, fromSource.isClosed:false)
2025-11-20 14:34:26 replication-orchestrator INFO thread status... heartbeat thread: false , replication thread: true
2025-11-20 14:34:31 replication-orchestrator INFO processMessage: done. (fromSource.isDone:true, forDest.isClosed:false)
2025-11-20 14:34:36 replication-orchestrator INFO writeToDestination: done. (forDest.isDone:true, isDestRunning:true)
2025-11-20 14:34:36 replication-orchestrator INFO thread status... timeout thread: false , replication thread: true
2025-11-20 14:34:36 destination INFO DefaultDispatcher-worker-2 i.a.c.l.d.i.JsonDestinationMessageInputFlow$collect$4(invokeSuspend):47 Input stream reader closed.
2025-11-20 14:34:36 destination INFO pool-2-thread-1 i.a.c.l.d.i.JsonDestinationMessageInputFlow(collect):51 Finished reading input.
2025-11-20 14:34:36 destination INFO pool-2-thread-1 i.a.c.l.d.i.DataFlowPipelineInputFlow(collect):58 Finished routing input.
2025-11-20 14:34:36 destination INFO DefaultDispatcher-worker-2 i.a.c.l.d.p.PipelineCompletionHandler$apply$2(invokeSuspend):31 Flushing 1 final aggregates...
2025-11-20 14:34:36 destination INFO DefaultDispatcher-worker-2 i.a.i.d.c.w.l.BinaryRowInsertBuffer(flush):59 Beginning insert into Question
2025-11-20 14:34:36 destination INFO DefaultDispatcher-worker-2 i.a.i.d.c.w.l.BinaryRowInsertBuffer(flush):70 Finished insert of 21105 rows into Question
2025-11-20 14:34:36 destination INFO DefaultDispatcher-worker-2 i.a.c.l.d.p.PipelineCompletionHandler$apply$2(invokeSuspend):46 Final aggregates flushed.
2025-11-20 14:34:36 destination INFO main i.a.c.l.d.p.PipelineRunner(run):47 Individual pipelines complete...
2025-11-20 14:34:36 destination INFO main i.a.c.l.d.p.PipelineRunner(run):51 Disabling state reconciler...
2025-11-20 14:34:36 destination INFO main i.a.c.l.d.p.PipelineRunner(run):64 Flushing final states...
2025-11-20 14:34:36 destination INFO main i.a.c.l.d.s.StateStore(logStateInfo):180 State diagnostic information [STREAM] (by descriptor):
- descriptor=Descriptor(namespace=UAT_DB_02_Assessments_GL_TenantId_1, name=Question) first key: StateKey(id=1, partitionKeys=[PartitionKey(id=UAT_DB_02_Assessments_GL_TenantId_1-Question-1)]) (state: StreamCheckpoint(checkpoint=Checkpoint(unmappedNamespace=UAT_DB_02_Assessments_GL_TenantId_1, unmappedName=Question, state={"version":2,"state_type":"ordered_column","ordered_col":"QuestionId","ordered_col_val":"124995","incremental_state":{"state_type":"cursor_based","version":2,"stream_name":"Question","stream_namespace":"assessment","cursor_field":["ModifiedTimestamp"],"cursor":"2025-11-20T07:42:49.336906Z","cursor_record_count":1}}, additionalProperties={}), sourceStats=Stats(recordCount=10000, rejectedRecordCount=0, additionalStats={}), destinationStats=null, additionalProperties={id=2}, serializedSizeBytes=520, checkpointKey=null, totalRecords=null, totalBytes=null, totalRejectedRecords=null, additionalStats={}))
  • Incomplete: expectedCount 10000.0 does not equal flushedCount 5735.0 (by partition: [5735.0])
2025-11-20 14:34:36 destination ERROR main i.a.c.AirbyteConnectorRunnable(run):38 Failed class io.airbyte.integrations.destination.clickhouse.cdk.WriteOperationV2 operation execution. java.lang.IllegalStateException: Sync completed, but unflushed states were detected.
	at io.airbyte.cdk.load.dataflow.pipeline.PipelineRunner.run(PipelineRunner.kt:69) ~[bulk-cdk-core-load-0.1.78.jar:?]
	at io.airbyte.cdk.load.dataflow.pipeline.PipelineRunner$run$1.invokeSuspend(PipelineRunner.kt) ~[bulk-cdk-core-load-0.1.78.jar:?]
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33) ~[kotlin-stdlib-2.1.10.jar:2.1.10-release-473]
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:100) ~[kotlinx-coroutines-core-jvm-1.10.2.jar:?]
	at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:263) ~[kotlinx-coroutines-core-jvm-1.10.2.jar:?]
	at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:94) ~[kotlinx-coroutines-core-jvm-1.10.2.jar:?]
	at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:70) ~[kotlinx-coroutines-core-jvm-1.10.2.jar:?]
	at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source) ~[kotlinx-coroutines-core-jvm-1.10.2.jar:?]
	at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:48) ~[kotlinx-coroutines-core-jvm-1.10.2.jar:?]
	at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source) ~[kotlinx-coroutines-core-jvm-1.10.2.jar:?]
	at io.airbyte.cdk.load.dataflow.DestinationLifecycle.run(DestinationLifecycle.kt:41) ~[bulk-cdk-core-load-0.1.78.jar:?]
	at io.airbyte.integrations.destination.clickhouse.cdk.WriteOperationV2.execute(WriteOperationV2.kt:24) ~[io.airbyte.airbyte-integrations.connectors-destination-clickhouse.jar:?]
	at io.airbyte.cdk.AirbyteConnectorRunnable.run(AirbyteConnectorRunnable.kt:36) [bulk-cdk-core-base-0.1.78.jar:?]
	at picocli.CommandLine.executeUserObject(CommandLine.java:2030) [picocli-4.7.6.jar:4.7.6]
	at picocli.CommandLine.access$1500(CommandLine.java:148) [picocli-4.7.6.jar:4.7.6]
	at picocli.CommandLine$RunLast.executeUserObjectOfLastSubcommandWithSameParent(CommandLine.java:2465) [picocli-4.7.6.jar:4.7.6]
	at picocli.CommandLine$RunLast.handle(CommandLine.java:2457) [picocli-4.7.6.jar:4.7.6]
	at picocli.CommandLine$RunLast.handle(CommandLine.java:2419) [picocli-4.7.6.jar:4.7.6]
	at picocli.CommandLine$AbstractParseResultHandler.execute(CommandLine.java:2277) [picocli-4.7.6.jar:4.7.6]
	at picocli.CommandLine$RunLast.execute(CommandLine.java:2421) [picocli-4.7.6.jar:4.7.6]
	at picocli.CommandLine.execute(CommandLine.java:2174) [picocli-4.7.6.jar:4.7.6]
	at io.airbyte.cdk.AirbyteDestinationRunner$Companion.run(AirbyteConnectorRunner.kt:289) [bulk-cdk-core-base-0.1.78.jar:?]
	at io.airbyte.cdk.AirbyteDestinationRunner$Companion.run$default(AirbyteConnectorRunner.kt:75) [bulk-cdk-core-base-0.1.78.jar:?]
	at io.airbyte.integrations.destination.clickhouse.ClickhouseDestinationKt.main(ClickhouseDestination.kt:10) [io.airbyte.airbyte-integrations.connectors-destination-clickhouse.jar:?]

Stack Trace: java.lang.IllegalStateException: Sync completed, but unflushed states were detected.
	at io.airbyte.cdk.load.dataflow.pipeline.PipelineRunner.run(PipelineRunner.kt:69)
	at io.airbyte.cdk.load.dataflow.pipeline.PipelineRunner$run$1.invokeSuspend(PipelineRunner.kt)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:100)
	at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:263)
	at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:94)
	at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:70)
	at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)
	at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:48)
	at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)
	at io.airbyte.cdk.load.dataflow.DestinationLifecycle.run(DestinationLifecycle.kt:41)
	at io.airbyte.integrations.destination.clickhouse.cdk.WriteOperationV2.execute(WriteOperationV2.kt:24)
	at io.airbyte.cdk.AirbyteConnectorRunnable.run(AirbyteConnectorRunnable.kt:36)
	at picocli.CommandLine.executeUserObject(CommandLine.java:2030)
	at picocli.CommandLine.access$1500(CommandLine.java:148)
	at picocli.CommandLine$RunLast.executeUserObjectOfLastSubcommandWithSameParent(CommandLine.java:2465)
	at picocli.CommandLine$RunLast.handle(CommandLine.java:2457)
	at picocli.CommandLine$RunLast.handle(CommandLine.java:2419)
	at picocli.CommandLine$AbstractParseResultHandler.execute(CommandLine.java:2277)
	at picocli.CommandLine$RunLast.execute(CommandLine.java:2421)
	at picocli.CommandLine.execute(CommandLine.java:2174)
	at io.airbyte.cdk.AirbyteDestinationRunner$Companion.run(AirbyteConnectorRunner.kt:289)
	at io.airbyte.cdk.AirbyteDestinationRunner$Companion.run$default(AirbyteConnectorRunner.kt:75)
       at io.airbyte.integrations.destination.clickhouse.ClickhouseDestinationKt.main(ClickhouseDestination.kt:10)
2025-11-20 14:34:36 destination INFO main i.a.c.AirbyteConnectorRunnable(run):47 Flushing output consumer prior to shutdown.
2025-11-20 14:34:36 destination INFO main i.a.c.AirbyteConnectorRunnable(run):49 Completed integration: airbyte/destination-clickhouse.
2025-11-20 14:34:36 destination ERROR java.lang.IllegalStateException: Sync completed, but unflushed states were detected.
2025-11-20 14:34:36 destination ERROR 	at io.airbyte.cdk.load.dataflow.pipeline.PipelineRunner.run(PipelineRunner.kt:69)
2025-11-20 14:34:36 destination ERROR 	at io.airbyte.cdk.load.dataflow.pipeline.PipelineRunner$run$1.invokeSuspend(PipelineRunner.kt)
2025-11-20 14:34:36 destination ERROR 	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
2025-11-20 14:34:36 destination ERROR 	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:100)
2025-11-20 14:34:36 destination ERROR 	at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:263)
2025-11-20 14:34:36 destination ERROR 	at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:94)
2025-11-20 14:34:36 destination ERROR 	at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:70)
2025-11-20 14:34:36 destination ERROR 	at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)
2025-11-20 14:34:36 destination ERROR 	at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:48)
2025-11-20 14:34:36 destination ERROR 	at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)
2025-11-20 14:34:36 destination ERROR 	at io.airbyte.cdk.load.dataflow.DestinationLifecycle.run(DestinationLifecycle.kt:41)
2025-11-20 14:34:36 destination ERROR 	at io.airbyte.integrations.destination.clickhouse.cdk.WriteOperationV2.execute(WriteOperationV2.kt:24)
2025-11-20 14:34:36 destination ERROR 	at io.airbyte.cdk.AirbyteConnectorRunnable.run(AirbyteConnectorRunnable.kt:36)
2025-11-20 14:34:36 destination ERROR 	at picocli.CommandLine.executeUserObject(CommandLine.java:2030)
2025-11-20 14:34:36 destination ERROR 	at picocli.CommandLine.access$1500(CommandLine.java:148)
2025-11-20 14:34:36 destination ERROR 	at picocli.CommandLine$RunLast.executeUserObjectOfLastSubcommandWithSameParent(CommandLine.java:2465)
2025-11-20 14:34:36 destination ERROR 	at picocli.CommandLine$RunLast.handle(CommandLine.java:2457)
2025-11-20 14:34:36 destination ERROR 	at picocli.CommandLine$RunLast.handle(CommandLine.java:2419)
2025-11-20 14:34:36 destination ERROR 	at picocli.CommandLine$AbstractParseResultHandler.execute(CommandLine.java:2277)
2025-11-20 14:34:36 destination ERROR 	at picocli.CommandLine$RunLast.execute(CommandLine.java:2421)
2025-11-20 14:34:36 destination ERROR 	at picocli.CommandLine.execute(CommandLine.java:2174)
2025-11-20 14:34:36 destination ERROR 	at io.airbyte.cdk.AirbyteDestinationRunner$Companion.run(AirbyteConnectorRunner.kt:289)
2025-11-20 14:34:36 destination ERROR 	at io.airbyte.cdk.AirbyteDestinationRunner$Companion.run$default(AirbyteConnectorRunner.kt:75)
2025-11-20 14:34:36 destination ERROR 	at io.airbyte.integrations.destination.clickhouse.ClickhouseDestinationKt.main(ClickhouseDestination.kt:10)
2025-11-20 14:34:36 replication-orchestrator INFO Destination finished successfully — exiting read dest...
2025-11-20 14:34:36 replication-orchestrator INFO readFromDestination: exception caught
2025-11-20 14:34:36 replication-orchestrator INFO readFromDestination: done. (writeToDestFailed:false, dest.isFinished:true)
2025-11-20 14:34:36 replication-orchestrator INFO Closing StateCheckSumCountEventHandler

Contribute

  • [x] Yes, I want to contribute

babaMar avatar Nov 19 '25 09:11 babaMar

Additional Report: Snowflake Destination with MS SQL Server CDC Source

This same issue is affecting the Snowflake destination (v4.0.30) with MS SQL Server source (v4.3.0-rc.8) using CDC replication.

Customer Impact

  • Workspace ID: cfe2edcd-1e17-4007-bf42-0e43423da6cd
  • Connection ID: a19bf30e-fbdd-4bc5-ba7b-cd647e666b14
  • Job IDs: 58346167, 58362556
  • Connector versions: MS SQL Server v4.3.0-rc.8, Snowflake v4.0.30
  • Bulk CDK version: 0.1.78

Behavior

The sync completes data writes to Snowflake but fails with the same IllegalStateException: Sync completed, but unflushed states were detected error. Because the state is not emitted, the CDC cursor does not advance, causing the next sync to re-read and re-process data from the previous checkpoint.

This results in:

  1. Duplicate data processing and billing for rows that were already synced
  2. Customer perceives this as "Airbyte lost its cursor state"

Log Evidence

State diagnostic information [GLOBAL]:
First key: StateKey(id=1, partitionKeys=[PartitionKey(id=rsHA)]) (state: GlobalCheckpoint...)

The log shows data was written to Snowflake (~5,522 rows inserted) but the global CDC state was never marked as complete.

Root Cause Analysis

This appears to be a bug in the bulk CDK's state reconciliation logic where the expected record count (from sourceStats.recordCount) does not match the flushed count tracked by StateHistogramStore. This affects multiple destinations using the bulk CDK (ClickHouse, Snowflake) and is not specific to any particular source connector.


Reported on behalf of [email protected] (@Airbyte-Support) via Devin session

anyone that can help here? @airbyte-jenny

babaMar avatar Dec 02 '25 16:12 babaMar