[Bug] A single bad message can cause Flink CDC writer to stuck in an infinite loop waiting for the schema change
Search before asking
- [X] I searched in the issues and found nothing similar.
Paimon version
0.9.0
Compute Engine
Flink 1.18.1
Minimal reproduce step
You can replicate this in local environment using any schema. You can use flink paimon action jar to sink Kafka cdc data into paimon. An example:
Schema : {'update_time' : DATE}
Produce data like : {'update_time': -2954}
This will cause the Flink ingestion job to run into an infinite loop waiting for the schema to update.
Error example :
2024-09-23 05:32:12,439 [] INFO org.apache.paimon.flink.sink.cdc.CdcRecordUtils [] - Failed to convert value [{"update_time": -2954}] to type ROW<update_time DATE 'update_time'>. Waiting for schema update.
java.lang.RuntimeException: Failed to parse Json String {"update_time": -2954}
at org.apache.paimon.utils.TypeUtils.castFromStringInternal(TypeUtils.java:192) ~[paimon-flink-1.18-0.9-SNAPSHOT.jar:0.9-SNAPSHOT]
at org.apache.paimon.utils.TypeUtils.castFromCdcValueString(TypeUtils.java:96) ~[paimon-flink-1.18-0.9-SNAPSHOT.jar:0.9-SNAPSHOT]
at org.apache.paimon.flink.sink.cdc.CdcRecordUtils.toGenericRow(CdcRecordUtils.java:105) ~[paimon-flink-1.18-0.9-SNAPSHOT.jar:0.9-SNAPSHOT]
at org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.processElement(CdcRecordStoreWriteOperator.java:80) ~[paimon-flink-1.18-0.9-SNAPSHOT.jar:0.9-SNAPSHOT]
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237) ~[flink-dist-1.18.0.jar:1.18.0]
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146) ~[flink-dist-1.18.0.jar:1.18.0]
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) ~[flink-dist-1.18.0.jar:1.18.0]
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.18.0.jar:1.18.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562) ~[flink-dist-1.18.0.jar:1.18.0]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-dist-1.18.0.jar:1.18.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858) ~[flink-dist-1.18.0.jar:1.18.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807) ~[flink-dist-1.18.0.jar:1.18.0]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953) [flink-dist-1.18.0.jar:1.18.0]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932) [flink-dist-1.18.0.jar:1.18.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) [flink-dist-1.18.0.jar:1.18.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) [flink-dist-1.18.0.jar:1.18.0]
at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: java.lang.RuntimeException: Failed to parse Json String {"update_time": -2954}
at org.apache.paimon.utils.TypeUtils.castFromStringInternal(TypeUtils.java:259) ~[paimon-flink-1.18-0.9-SNAPSHOT.jar:0.9-SNAPSHOT]
at org.apache.paimon.utils.TypeUtils.castFromStringInternal(TypeUtils.java:177) ~[paimon-flink-1.18-0.9-SNAPSHOT.jar:0.9-SNAPSHOT]
... 16 more
Caused by: java.time.DateTimeException: For input string: '-2954'.
at org.apache.paimon.utils.BinaryStringUtils.toDate(BinaryStringUtils.java:289) ~[paimon-flink-1.18-0.9-SNAPSHOT.jar:0.9-SNAPSHOT]
at org.apache.paimon.utils.TypeUtils.castFromStringInternal(TypeUtils.java:157) ~[paimon-flink-1.18-0.9-SNAPSHOT.jar:0.9-SNAPSHOT]
at org.apache.paimon.utils.TypeUtils.castFromStringInternal(TypeUtils.java:243) ~[paimon-flink-1.18-0.9-SNAPSHOT.jar:0.9-SNAPSHOT]
at org.apache.paimon.utils.TypeUtils.castFromStringInternal(TypeUtils.java:177) ~[paimon-flink-1.18-0.9-SNAPSHOT.jar:0.9-SNAPSHOT]
... 16 more
What doesn't meet your expectations?
This caused the job to run into an infinite loop and fail checkpointing. I expected it to loudly fail with the exception that message it got is corrupted instead of silently running into an infinite loop. All the exceptions in Flink UI pointed to checkpoint timeout.
Anything else?
No response
Are you willing to submit a PR?
- [X] I'm willing to submit a PR!
A possible way to tackle this can be to try a configurable no of time before bubbling the exception. We can have a config cdc.retry-num-times with default as 3, if the conversion to genericRow still fails, we can fail the job by bubbling up the exception.
@JingsongLi @zhuangchong any thoughts on this?
retry times looks good to me! Maybe default 3 is too small. cdc.retry-sleep-time is 0.5 seconds. The default value can be 100?
retry times looks good to me! Maybe default 3 is too small. cdc.retry-sleep-time is 0.5 seconds. The default value can be 100?
All right I will work on the PR.
@JingsongLi alternatively if it is a corrupt record, it is going to keep on failing unless that record is somehow skipped. Do you think we should also provide a functionality to skip the record as well with a default for skipping as false?