flink-connector-clickhouse icon indicating copy to clipboard operation
flink-connector-clickhouse copied to clipboard

[Bug]: Checkpointing doesn't trigger records flush to clickhouse

Open karpoftea opened this issue 4 months ago • 1 comments

What happened?

Originally I've tested connector agains type error failures (type incompatibility between kafka source table and clickhouse sink table): selected from kafka-table integer column (say, json number field as cnt INTEGER in kafka table) and inserted it to clickhouse table column (Int64). If cnt=1 everything works as expected - value is saved to clickhouse. If in clickhouse I change column type to UInt64 and cnt=-1 then exception occurs (which is OK), task restart and after several restart it changes state to RUNNING (so just leaving corrupted message behind). That is not an expected behaviour, because data was lost. Expected behaviour is to stuck and wait for manual resolve (either move offset or change clickhouse table schema).

After I digged into code and found that DynamicTableSink is implemented using OutputFormatProvider/OutputFormat. My guess that OutputFormat does not call flush() when checkpoint occurs and thus checkpointing is always OK. Then I changed connector sink.flush-interval to 10min and set flink checkpoint to 1min, and saw that ClickHouseBatchOutputFormat.flush() is not triggered by checkpoint. Seems like my guess is right.

Can you kindly tell If using OutputFormat as a SinkRuntimeProvider was a design choice? If yes what was the reason for not choosing SinkAPI (org.apache.flink.api.connector.sink2.Sink) for implementation?

Affects Versions

master/1.16.0

What are you seeing the problem on?

Flink-Table-Api (SQL)

How to reproduce

  1. create kafka source table and clickhouse sink table
  2. select from kafka source and insert selected value to clickhouse sink
  3. set flink checkpoint interval to 1m
  4. set sink.flush-interval to 10min
  5. start cluster and submit pipeline
  6. push 1 message to kafka.
  7. wait for a checkpoint
  8. after checkpoint occurs see that (1) offset has moved forward (2) message was not delivered to clickhouse

Relevant log output

No response

Anything else

the core problem is that checkpointing does not trigger flush, so event if sink has exception (flushException) it will be healthy for a flink runtime

Are you willing to submit a PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

  • [X] I agree to follow this project's Code of Conduct

karpoftea avatar Oct 11 '24 08:10 karpoftea