spring-cloud-stream
spring-cloud-stream copied to clipboard
Reactive Function stops processing after Timeout exception from Kinesis
Framework Version :
spring-cloud-stream: 3.1.4 spring-cloud-stream-binder-kinesis : 2.2.0
Issue :
We are using Spring Cloud Stream Kinesis with KPL/KCL enabled. We have a reactive Function, which consumes messages from kinesis stream, and sends the message to another kinesis stream after applying message transformation logic. In short, workflow definition : input kafka stream -> transform message - > output kafka stream.
@Bean
public Function<Flux<Message<String>>, Flux<Message<String>>>
eventTransformer() {
return in ->
in.map(this::transformEvent)
.filter(Optional::isPresent)
.map(Optional::get)
}
with bindings
spring:
cloud:
stream:
bindings:
eventTransformer-in-0:
destination: input-events
group: event-transformer
eventTransformer-out-0:
destination: transformed-events
Everything goes well until we receive the following exception on Producer's Timeout :
org.springframework.integration.MessageTimeoutException: Timeout waiting for response from AmazonKinesis from org.springframework.integration.aws.outbound.AbstractAwsMessageHandler.handleMessageInternal.
(Seems from the Spring Integration's KPL Message Handler)
After this exception, we continuously see the following exceptions.
"org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel"
How to reproduce ?
We can reproduce above issue by setting the value of producer's timeout - spring.cloud.stream.kinesis.bindings.eventTransformer-out-0.producer.sendTimeout
value to a low value. For example : 10ms.
Question: How can we recover gracefully ?
Things will work fine if we restart the server. But, how can we recover gracefully, from the Timeout exception from `"org.springframework.integration.aws.outbound.AbstractAwsMessageHandler.handleMessageInternal"? Is there any feature that can help in resolving this issue.
########
Exception 1 :
{ [-]
@timestamp: 2022-01-31T02:36:13.245+00:00
@version: 1
level: ERROR
level_value: 40000
logger_name: reactor.core.publisher.Operators
message: Operator called default onErrorDropped
stack_trace: reactor.core.Exceptions$ErrorCallbackNotImplemented: org.springframework.integration.MessageTimeoutException: Timeout waiting for response from AmazonKinesis; nested exception is java.util.concurrent.TimeoutException: Waited 10000 milliseconds (plus 78918 nanoseconds delay) for com.google.common.util.concurrent.SettableFuture@144fa713[status=PENDING], failedMessage=GenericMessage [payload=byte[5285], headers={scst_partition=163, scst_partitionOverride=d450b4b0-11a2-46f8-b14e-5c682a40e629, id=1f022f63-c576-714c-8897-7647d9dc834d, partitionKey=d450b4b0-11a2-46f8-b14e-5c682a40e629, contentType=application/json, timestamp=1643596563229}]
Caused by: org.springframework.integration.MessageTimeoutException: Timeout waiting for response from AmazonKinesis; nested exception is java.util.concurrent.TimeoutException: Waited 10000 milliseconds (plus 78918 nanoseconds delay) for com.google.common.util.concurrent.SettableFuture@144fa713[status=PENDING]
at org.springframework.integration.aws.outbound.AbstractAwsMessageHandler.handleMessageInternal(AbstractAwsMessageHandler.java:193)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1056)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder.lambda$null$8(FunctionConfiguration.java:537)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:185)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
at reactor.core.publisher.FluxFilter$FilterSubscriber.onNext(FluxFilter.java:113)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:220)
at reactor.core.publisher.FluxFilter$FilterConditionalSubscriber.onNext(FluxFilter.java:247)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:220)
at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:220)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:220)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:220)
at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
at reactor.core.publisher.UnicastManySinkNoBackpressure.tryEmitNext(UnicastManySinkNoBackpressure.java:120)
at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100)
at org.springframework.integration.util.IntegrationReactiveUtils.lambda$adaptSubscribableChannelToPublisher$8(IntegrationReactiveUtils.java:142)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208)
at org.springframework.integration.aws.inbound.kinesis.KclMessageDrivenChannelAdapter.access$1600(KclMessageDrivenChannelAdapter.java:84)
at org.springframework.integration.aws.inbound.kinesis.KclMessageDrivenChannelAdapter$RecordProcessor.performSend(KclMessageDrivenChannelAdapter.java:520)
at org.springframework.integration.aws.inbound.kinesis.KclMessageDrivenChannelAdapter$RecordProcessor.processSingleRecord(KclMessageDrivenChannelAdapter.java:435)
at org.springframework.integration.aws.inbound.kinesis.KclMessageDrivenChannelAdapter$RecordProcessor.processRecords(KclMessageDrivenChannelAdapter.java:418)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.V1ToV2RecordProcessorAdapter.processRecords(V1ToV2RecordProcessorAdapter.java:42)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.callProcessRecords(ProcessTask.java:221)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.call(ProcessTask.java:176)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:49)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:24)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.util.concurrent.TimeoutException: Waited 10000 milliseconds (plus 78918 nanoseconds delay) for com.google.common.util.concurrent.SettableFuture@144fa713[status=PENDING]
at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:494)
at com.google.common.util.concurrent.AbstractFuture$TrustedFuture.get(AbstractFuture.java:97)
at org.springframework.integration.aws.outbound.AbstractAwsMessageHandler.handleMessageInternal(AbstractAwsMessageHandler.java:190)
... 50 common frames omitted
thread_name: SimpleAsyncTaskExecutor-168739
}
Exception 2:
{ [-]
@timestamp: 2022-01-31T02:36:13.246+00:00
@version: 1
level: DEBUG
level_value: 10000
logger_name: org.springframework.cloud.stream.binder.BinderErrorChannel
message: preSend on channel 'bean 'input-events.event-transformer.errors'', message: ErrorMessage [payload=org.springframework.messaging.MessageDeliveryException: Dispatcher failed to deliver Message; nested exception is java.lang.IllegalStateException: Cannot emit messages into the cancelled or terminated sink for message channel: bean 'eventTransformer-in-0', failedMessage=GenericMessage [payload=byte[7084], headers={aws_shard=shardId-000000000006, id=b04abb36-b0fd-31d6-22a4-3f619a500641, sourceData=UserRecord [subSequenceNumber=0, explicitHashKey=null, aggregated=false, getSequenceNumber()=49624679240045578929376251831180594873806565534580342882, getData()=java.nio.HeapByteBuffer[pos=0 lim=7084 cap=7084], getPartitionKey()=42DE593513A575111963BEFB90D85D01], contentType=application/json, aws_receivedPartitionKey=42DE593513A575111963BEFB90D85D01, aws_receivedStream=input-events, aws_receivedSequenceNumber=49624679240045578929376251831180594873806565534580342882, timestamp=1643596565230}], headers={aws_rawRecord=UserRecord [subSequenceNumber=0, explicitHashKey=null, aggregated=false, getSequenceNumber()=49624679240045578929376251831180594873806565534580342882, getData()=java.nio.HeapByteBuffer[pos=0 lim=7084 cap=7084], getPartitionKey()=42DE593513A575111963BEFB90D85D01], id=fa9c4566-5a8e-68a0-3c75-9434a3e1e21b, timestamp=1643596573246}] for original GenericMessage [payload=byte[7084], headers={aws_shard=shardId-000000000006, id=b04abb36-b0fd-31d6-22a4-3f619a500641, sourceData=UserRecord [subSequenceNumber=0, explicitHashKey=null, aggregated=false, getSequenceNumber()=49624679240045578929376251831180594873806565534580342882, getData()=java.nio.HeapByteBuffer[pos=0 lim=7084 cap=7084], getPartitionKey()=42DE593513A575111963BEFB90D85D01], contentType=application/json, aws_receivedPartitionKey=42DE593513A575111963BEFB90D85D01, aws_receivedStream=input-events, aws_receivedSequenceNumber=49624679240045578929376251831180594873806565534580342882, timestamp=1643596565230}]
thread_name: SimpleAsyncTaskExecutor-168740
}
Basically with reactive functions there is not much we can do at the framework level to handle any errors since we have no control over the stream. This is completely different with imperative function.
In Imperative functions the uint of work is a single Message so we effectively wrap the invocation thus maintaining control before and after function invocation. Function gets invoked on each message. In Reactive function the nit of work is the entire stream, so function gets invoked only once during application context initialization and from that point on the control is handed to reactive API.
I will keep this open as I want to look at your specific case and see if we can do something, but regardless. . . consider using reactive error handling API inside your function implementation. It is pretty rich (e.g., doOnError(..)
,onError(..)
etc)
@olegz Thank you for quick response.
The first exception "MessageTimeoutException" is happening out side our Function. So, at this time I don't see any help having doOnError or onError inside our function. Because, due to this exception the subscription seems getting cancelled. And any further message processing results in "org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'event-transformer.eventTransformer-in-0'"
along with "org.springframework.messaging.MessageDeliveryException: Dispatcher failed to deliver Message; nested exception is java.lang.IllegalStateException: Cannot emit messages into the cancelled or terminated sink for message channel: bean 'eventTransformer-in-0'"
Since the first exception happened out side my function, only way I could catch is, using the following reactive global hook and suppress it. Not sure if there is any better way. This is kind of an easily re-producible case if we have low timeout settings. So, looking for any better way to handle this case. Appreciate your advice.
Hooks.onNextError((e,data) -> {
if (e instanceof org.springframework.integration.MessageTimeoutException) {
return null;
}
else {
return e;
}
});
I would say such a timeout exception has to be retriable. But doesn't look like we have any options for the producer binding to apply it...
Yes. I was looking for a way to retry without using the above mentioned Hook.
Seems like the only better option for me at this time is - Change my function to imperative style, listen to the error on the consumer error channel, reprocess and repost the message to my producer destination using StreamBridge.
Basically with reactive functions there is not much we can do at the framework level to handle any errors since we have no control over the stream. This is completely different with imperative function.
In Imperative functions the uint of work is a single Message so we effectively wrap the invocation thus maintaining control before and after function invocation. Function gets invoked on each message. In Reactive function the nit of work is the entire stream, so function gets invoked only once during application context initialization and from that point on the control is handed to reactive API.
I will keep this open as I want to look at your specific case and see if we can do something, but regardless. . . consider using reactive error handling API inside your function implementation. It is pretty rich (e.g.,
doOnError(..)
,onError(..)
etc)
Do I have the option to add custom listeners in which if handlers are getting revoked then try to add it back? In my case a similar issue was observed with debug, I see an exception at https://github.com/spring-projects/spring-integration/blob/fdc47684c2883f380ba2777c998484dd11496208/spring-integration-core/src/main/java/org/springframework/integration/channel/AbstractSubscribableChannel.java#L75 and the handler is empty at https://github.com/spring-projects/spring-integration/blob/fdc47684c2883f380ba2777c998484dd11496208/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/UnicastingDispatcher.java#L172
So, Can I try to add the handler again? do you see it works?
I was trying to collect your suggestions here
Does it mean the DLQ mechanism is ineffective when using reactive function? ie. the subscriber will be removed after the first error
I am going to close it as there are similar issues out here and plenty of explanation why we can't do that. In fact I marked this one as "ideal-for-contribution" - https://github.com/spring-cloud/spring-cloud-stream/issues/2142 and if someone is interested I'll be glad to review the PR