pulsar-flink icon indicating copy to clipboard operation
pulsar-flink copied to clipboard

[BUG] Failure to write to topic causes checkpoint to be declined, and all subsequent checkpoints are aborted.

Open subbu107 opened this issue 4 years ago • 1 comments

Describe the bug Checkpoints work up to a certain point, then checkpoint gets declined because of failure to write to topic, and after that all subsequent checkpoints get aborted.

Because of this, it appears we also see an issue where the backlog on the source keeps on growing. Our understanding is that the cursor reset will not be done if checkpoint fails, hence the backlog on source keeps growing.

To Reproduce Occurs sporadically; we don't have a specific set of steps

Expected behavior Even if one checkpoint gets declined due to failure, the subsequent checkpoints should go through successfully and not fail

Log messages

profile2_7.Transfo rm -> (Transform_transformation, JsonNode_to_String -> (preload_profile2_7_monitoring, Sink: preload_profile2_7))) (1/8)] INFO o.a.f.s.runtime.tasks.SubtaskCheckpointCoordinatorImpl - Could not complete snapshot 1142 for operator Source: profile2_7 -> (profile2_7_monitoring, invalid_data_topic_monitoring, Sink: invalid_data_topic, String_to_JsonNode -> Kafka_TimestampTransform_Kafka_profile2_7 .Transform -> (Transform_transformation, JsonNode_to_String -> (preload_profile2_7_monitoring, Sink: preload_profile2_7))) (1/8). Failure reason: Checkpoint was declined. 9170 org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 1142 for operator Source: profile2_7 -> (profile2_7_monitoring, invalid_data_topic_monitoring, Sink: invalid_data_ topic, String_to_JsonNode -> Kafka_TimestampTransform_Kafka_profile2_7.Transform -> (Transform_transformation, JsonNode_to_String -> (preload_profile2_7_monitoring, Sink: preload_profile2_7))) (1/8). Failure reason: Checkpoint was declined. 9171 at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:215) 9172 at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:156) 9173 at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:339) 9174 at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:614) 9175 at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:540) 9176 at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:507) 9177 at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:266) 9178 at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:924) 9179 at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) 9180 at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:914) 9181 at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:845) 9182 at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$7(StreamTask.java:821) 9183 at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) 9184 at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) 9185 at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:301) 9186 at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:183) 9187 at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569) 9188 at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534) 9189 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) 9190 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) 9191 at java.lang.Thread.run(Thread.java:748) 9192 Caused by: org.apache.flink.util.SerializedThrowable: The producer itomdipulsar-1-341 can not send message to the topic persistent://public/default/preload_profile2_7-partition-1 within given timeout 9193 at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:916) 9194 at org.apache.pulsar.client.impl.ProducerBase.flush(ProducerBase.java:127) 9195 at org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSinkBase.producerFlush(FlinkPulsarSinkBase.java:274) 9196 at org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSink.producerFlush(FlinkPulsarSink.java:34) 9197 at org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSinkBase.snapshotState(FlinkPulsarSinkBase.java:158) 9198 at org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSink.snapshotState(FlinkPulsarSink.java:34) 9199 at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:120) 9200 at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:101) 9201 at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) 9202 at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:186) 9203 ... 20 common frames omitted

Subsequent checkpoints expire:

flink--standalonesession-0-itom-di-dp-master-dpl-8675cc88bd-k2cbr.log:2021-05-07 05:51:26.093 [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1332 (type=CHECKPOINT) @ 1620366686072 for job 0aa97e77596fe878e0d234dccf432da3.

flink--standalonesession-0-itom-di-dp-master-dpl-8675cc88bd-k2cbr.log:2021-05-07 05:56:26.093 [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 1332 of job 0aa97e77596fe878e0d234dccf432da3 expired before completing.

subbu107 avatar May 11 '21 15:05 subbu107

@subbu107 The Producer in the sink is sent asynchronously. It is sent in snapshotState. There is an asynchronous queue in the Producer. In the case of back pressure, it is easy to cause continuous timeouts. @syhily I think the processing here may need to be adjusted.

jianyun8023 avatar May 13 '21 20:05 jianyun8023