spring-cloud-stream icon indicating copy to clipboard operation
spring-cloud-stream copied to clipboard

Reactive Function stops processing after Timeout exception from Kinesis

Open lakshmikanthpadala opened this issue 3 years ago • 6 comments

Framework Version :

spring-cloud-stream: 3.1.4 spring-cloud-stream-binder-kinesis : 2.2.0

Issue :

We are using Spring Cloud Stream Kinesis with KPL/KCL enabled. We have a reactive Function, which consumes messages from kinesis stream, and sends the message to another kinesis stream after applying message transformation logic. In short, workflow definition : input kafka stream -> transform message - > output kafka stream.

   @Bean
  public Function<Flux<Message<String>>, Flux<Message<String>>>
      eventTransformer() {
    return in ->
        in.map(this::transformEvent)
            .filter(Optional::isPresent)
            .map(Optional::get)

  }

with bindings

  spring:
    cloud:
      stream:
        bindings:
          eventTransformer-in-0:
            destination: input-events
            group: event-transformer
          eventTransformer-out-0:
            destination: transformed-events

Everything goes well until we receive the following exception on Producer's Timeout :

org.springframework.integration.MessageTimeoutException: Timeout waiting for response from AmazonKinesis from org.springframework.integration.aws.outbound.AbstractAwsMessageHandler.handleMessageInternal. (Seems from the Spring Integration's KPL Message Handler)

After this exception, we continuously see the following exceptions.

"org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel"

How to reproduce ?

We can reproduce above issue by setting the value of producer's timeout - spring.cloud.stream.kinesis.bindings.eventTransformer-out-0.producer.sendTimeout value to a low value. For example : 10ms.

Question: How can we recover gracefully ?

Things will work fine if we restart the server. But, how can we recover gracefully, from the Timeout exception from `"org.springframework.integration.aws.outbound.AbstractAwsMessageHandler.handleMessageInternal"? Is there any feature that can help in resolving this issue.

########

Exception 1 :


{ [-]
   @timestamp: 2022-01-31T02:36:13.245+00:00
   @version: 1
   level: ERROR
   level_value: 40000
   logger_name: reactor.core.publisher.Operators
   message: Operator called default onErrorDropped
   stack_trace: reactor.core.Exceptions$ErrorCallbackNotImplemented: org.springframework.integration.MessageTimeoutException: Timeout waiting for response from AmazonKinesis; nested exception is java.util.concurrent.TimeoutException: Waited 10000 milliseconds (plus 78918 nanoseconds delay) for com.google.common.util.concurrent.SettableFuture@144fa713[status=PENDING], failedMessage=GenericMessage [payload=byte[5285], headers={scst_partition=163, scst_partitionOverride=d450b4b0-11a2-46f8-b14e-5c682a40e629, id=1f022f63-c576-714c-8897-7647d9dc834d, partitionKey=d450b4b0-11a2-46f8-b14e-5c682a40e629, contentType=application/json, timestamp=1643596563229}]
Caused by: org.springframework.integration.MessageTimeoutException: Timeout waiting for response from AmazonKinesis; nested exception is java.util.concurrent.TimeoutException: Waited 10000 milliseconds (plus 78918 nanoseconds delay) for com.google.common.util.concurrent.SettableFuture@144fa713[status=PENDING]
  at org.springframework.integration.aws.outbound.AbstractAwsMessageHandler.handleMessageInternal(AbstractAwsMessageHandler.java:193)
  at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
  at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1056)
  at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
  at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
  at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
  at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
  at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
  at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
  at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
  at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder.lambda$null$8(FunctionConfiguration.java:537)
  at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:185)
  at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200)
  at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
  at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200)
  at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
  at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
  at reactor.core.publisher.FluxFilter$FilterSubscriber.onNext(FluxFilter.java:113)
  at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:220)
  at reactor.core.publisher.FluxFilter$FilterConditionalSubscriber.onNext(FluxFilter.java:247)
  at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:220)
  at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
  at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:220)
  at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:220)
  at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:220)
  at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
  at reactor.core.publisher.UnicastManySinkNoBackpressure.tryEmitNext(UnicastManySinkNoBackpressure.java:120)
  at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100)
  at org.springframework.integration.util.IntegrationReactiveUtils.lambda$adaptSubscribableChannelToPublisher$8(IntegrationReactiveUtils.java:142)
  at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
  at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
  at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
  at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
  at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
  at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
  at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
  at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
  at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
  at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
  at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208)
  at org.springframework.integration.aws.inbound.kinesis.KclMessageDrivenChannelAdapter.access$1600(KclMessageDrivenChannelAdapter.java:84)
  at org.springframework.integration.aws.inbound.kinesis.KclMessageDrivenChannelAdapter$RecordProcessor.performSend(KclMessageDrivenChannelAdapter.java:520)
  at org.springframework.integration.aws.inbound.kinesis.KclMessageDrivenChannelAdapter$RecordProcessor.processSingleRecord(KclMessageDrivenChannelAdapter.java:435)
  at org.springframework.integration.aws.inbound.kinesis.KclMessageDrivenChannelAdapter$RecordProcessor.processRecords(KclMessageDrivenChannelAdapter.java:418)
  at com.amazonaws.services.kinesis.clientlibrary.lib.worker.V1ToV2RecordProcessorAdapter.processRecords(V1ToV2RecordProcessorAdapter.java:42)
  at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.callProcessRecords(ProcessTask.java:221)
  at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.call(ProcessTask.java:176)
  at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:49)
  at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:24)
  at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
  at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.util.concurrent.TimeoutException: Waited 10000 milliseconds (plus 78918 nanoseconds delay) for com.google.common.util.concurrent.SettableFuture@144fa713[status=PENDING]
  at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:494)
  at com.google.common.util.concurrent.AbstractFuture$TrustedFuture.get(AbstractFuture.java:97)
  at org.springframework.integration.aws.outbound.AbstractAwsMessageHandler.handleMessageInternal(AbstractAwsMessageHandler.java:190)
  ... 50 common frames omitted

   thread_name: SimpleAsyncTaskExecutor-168739
}

Exception 2:

{ [-]
   @timestamp: 2022-01-31T02:36:13.246+00:00
   @version: 1
   level: DEBUG
   level_value: 10000
   logger_name: org.springframework.cloud.stream.binder.BinderErrorChannel
   message: preSend on channel 'bean 'input-events.event-transformer.errors'', message: ErrorMessage [payload=org.springframework.messaging.MessageDeliveryException: Dispatcher failed to deliver Message; nested exception is java.lang.IllegalStateException: Cannot emit messages into the cancelled or terminated sink for message channel: bean 'eventTransformer-in-0', failedMessage=GenericMessage [payload=byte[7084], headers={aws_shard=shardId-000000000006, id=b04abb36-b0fd-31d6-22a4-3f619a500641, sourceData=UserRecord [subSequenceNumber=0, explicitHashKey=null, aggregated=false, getSequenceNumber()=49624679240045578929376251831180594873806565534580342882, getData()=java.nio.HeapByteBuffer[pos=0 lim=7084 cap=7084], getPartitionKey()=42DE593513A575111963BEFB90D85D01], contentType=application/json, aws_receivedPartitionKey=42DE593513A575111963BEFB90D85D01, aws_receivedStream=input-events, aws_receivedSequenceNumber=49624679240045578929376251831180594873806565534580342882, timestamp=1643596565230}], headers={aws_rawRecord=UserRecord [subSequenceNumber=0, explicitHashKey=null, aggregated=false, getSequenceNumber()=49624679240045578929376251831180594873806565534580342882, getData()=java.nio.HeapByteBuffer[pos=0 lim=7084 cap=7084], getPartitionKey()=42DE593513A575111963BEFB90D85D01], id=fa9c4566-5a8e-68a0-3c75-9434a3e1e21b, timestamp=1643596573246}] for original GenericMessage [payload=byte[7084], headers={aws_shard=shardId-000000000006, id=b04abb36-b0fd-31d6-22a4-3f619a500641, sourceData=UserRecord [subSequenceNumber=0, explicitHashKey=null, aggregated=false, getSequenceNumber()=49624679240045578929376251831180594873806565534580342882, getData()=java.nio.HeapByteBuffer[pos=0 lim=7084 cap=7084], getPartitionKey()=42DE593513A575111963BEFB90D85D01], contentType=application/json, aws_receivedPartitionKey=42DE593513A575111963BEFB90D85D01, aws_receivedStream=input-events, aws_receivedSequenceNumber=49624679240045578929376251831180594873806565534580342882, timestamp=1643596565230}]
   thread_name: SimpleAsyncTaskExecutor-168740
}

lakshmikanthpadala avatar Feb 01 '22 16:02 lakshmikanthpadala

Basically with reactive functions there is not much we can do at the framework level to handle any errors since we have no control over the stream. This is completely different with imperative function.

In Imperative functions the uint of work is a single Message so we effectively wrap the invocation thus maintaining control before and after function invocation. Function gets invoked on each message. In Reactive function the nit of work is the entire stream, so function gets invoked only once during application context initialization and from that point on the control is handed to reactive API.

I will keep this open as I want to look at your specific case and see if we can do something, but regardless. . . consider using reactive error handling API inside your function implementation. It is pretty rich (e.g., doOnError(..),onError(..) etc)

olegz avatar Feb 01 '22 16:02 olegz

@olegz Thank you for quick response.

The first exception "MessageTimeoutException" is happening out side our Function. So, at this time I don't see any help having doOnError or onError inside our function. Because, due to this exception the subscription seems getting cancelled. And any further message processing results in "org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'event-transformer.eventTransformer-in-0'" along with "org.springframework.messaging.MessageDeliveryException: Dispatcher failed to deliver Message; nested exception is java.lang.IllegalStateException: Cannot emit messages into the cancelled or terminated sink for message channel: bean 'eventTransformer-in-0'"

Since the first exception happened out side my function, only way I could catch is, using the following reactive global hook and suppress it. Not sure if there is any better way. This is kind of an easily re-producible case if we have low timeout settings. So, looking for any better way to handle this case. Appreciate your advice.

    Hooks.onNextError((e,data) -> {
      if (e instanceof org.springframework.integration.MessageTimeoutException) {
        return null;
      }
      else {
        return e;
      }
    });

lakshmikanthpadala avatar Feb 02 '22 05:02 lakshmikanthpadala

I would say such a timeout exception has to be retriable. But doesn't look like we have any options for the producer binding to apply it...

artembilan avatar Feb 02 '22 16:02 artembilan

Yes. I was looking for a way to retry without using the above mentioned Hook.

Seems like the only better option for me at this time is - Change my function to imperative style, listen to the error on the consumer error channel, reprocess and repost the message to my producer destination using StreamBridge.

lakshmikanthpadala avatar Feb 03 '22 23:02 lakshmikanthpadala

Basically with reactive functions there is not much we can do at the framework level to handle any errors since we have no control over the stream. This is completely different with imperative function.

In Imperative functions the uint of work is a single Message so we effectively wrap the invocation thus maintaining control before and after function invocation. Function gets invoked on each message. In Reactive function the nit of work is the entire stream, so function gets invoked only once during application context initialization and from that point on the control is handed to reactive API.

I will keep this open as I want to look at your specific case and see if we can do something, but regardless. . . consider using reactive error handling API inside your function implementation. It is pretty rich (e.g., doOnError(..),onError(..) etc)

Do I have the option to add custom listeners in which if handlers are getting revoked then try to add it back? In my case a similar issue was observed with debug, I see an exception at https://github.com/spring-projects/spring-integration/blob/fdc47684c2883f380ba2777c998484dd11496208/spring-integration-core/src/main/java/org/springframework/integration/channel/AbstractSubscribableChannel.java#L75 and the handler is empty at https://github.com/spring-projects/spring-integration/blob/fdc47684c2883f380ba2777c998484dd11496208/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/UnicastingDispatcher.java#L172

So, Can I try to add the handler again? do you see it works?

I was trying to collect your suggestions here

snofty avatar Mar 10 '22 07:03 snofty

Does it mean the DLQ mechanism is ineffective when using reactive function? ie. the subscriber will be removed after the first error

nithril avatar May 20 '22 05:05 nithril

I am going to close it as there are similar issues out here and plenty of explanation why we can't do that. In fact I marked this one as "ideal-for-contribution" - https://github.com/spring-cloud/spring-cloud-stream/issues/2142 and if someone is interested I'll be glad to review the PR

olegz avatar Jan 04 '23 18:01 olegz