alpakka icon indicating copy to clipboard operation
alpakka copied to clipboard

Alpakka KinesisFlow Error Handling with Side Effect?

Open ergunbaris opened this issue 3 years ago • 1 comments

Versions used

2.0.2 -> alpakka val kclAkkaStreamVersion = "4.1.1" val alpakkaVersion = "2.0.2" val akkaHttpVersion = "10.1.12" <- I know the suggested akka-http client is 10.1.11

Akka version: val akkaVersion = 2.6.14

Expected Behavior

KinesisFlow should return Try[PutRecordsResultEntry] not allowing any side affects as in DynamoDb Alpakka implementation. I have checked the most recent version 3.0.3 and found the flow to be implemented the same as 2.0.2 version

kinesisClient .putRecords( PutRecordsRequest.builder().streamName(streamName).records(entries.map(_.1).asJavaCollection).build ) .toScala .transform(handlePutRecordsSuccess(entries), FailurePublishingRecords())(sameThreadExecutionContext)

Actual Behavior

When using the buggy version 10.1.12 of "akka-http" client The racing condition for akka-http client can happen randomly closing the http connection unexpectedly.

This creates a side affect in alpakka KinesisFlow implementation because the return type is PutRecordsResultEntry and instead a FailurePublishingRecords error is thrown and Upstream fails. FailurePublishingError cause is

1630784719798,"2021-09-04 19:45:19,797 [WARN] from io.netty.channel.DefaultChannelPipeline in aws-java-sdk-NettyEventLoop-2-1 - An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception." 1630784719798,java.io.IOException: The channel was closed before the protocol could be determined. 1630784719798, at software.amazon.awssdk.http.nio.netty.internal.http2.Http2SettingsFrameHandler.channelUnregistered(Http2SettingsFrameHandler.java:58) 1630784719798, at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:196) 1630784719798, at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:182) 1630784719798, at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:175) 1630784719798, at io.netty.channel.DefaultChannelPipeline$HeadContext.channelUnregistered(DefaultChannelPipeline.java:1388) 1630784719798, at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:196) 1630784719798, at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:182) 1630784719798, at io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:821) 1630784719798, at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:826) 1630784719798, at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) 1630784719798, at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) 1630784719798, at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500) 1630784719798, at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) 1630784719798, at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) 1630784719798, at java.lang.Thread.run(Thread.java:748) 1630784719824,"2021-09-04 19:45:19,823 [ERROR] from .....Service$ in flm-actor-system-akka.actor.default-dispatcher-13416 - Facility Load Main Stream failed calling other streams shutdown gracefully.." 1630784719824,akka.stream.alpakka.kinesis.KinesisErrors$FailurePublishingRecords: Failure publishing records to Kinesis. Reason : software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP request: Acquire operation took longer than the configured maximum time. This indicates that a request cannot get a connection from the pool within the specified maximum time. This can be due to high request rate.

Relevant logs

If you identified a section in your logs that explains the bug or might be important to understand it, please add it.

1630784719798,"2021-09-04 19:45:19,797 [WARN] from io.netty.channel.DefaultChannelPipeline in aws-java-sdk-NettyEventLoop-2-1 - An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception." 1630784719798,java.io.IOException: The channel was closed before the protocol could be determined. 1630784719798, at software.amazon.awssdk.http.nio.netty.internal.http2.Http2SettingsFrameHandler.channelUnregistered(Http2SettingsFrameHandler.java:58) 1630784719798, at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:196) 1630784719798, at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:182) 1630784719798, at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:175) 1630784719798, at io.netty.channel.DefaultChannelPipeline$HeadContext.channelUnregistered(DefaultChannelPipeline.java:1388) 1630784719798, at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:196) 1630784719798, at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:182) 1630784719798, at io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:821) 1630784719798, at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:826) 1630784719798, at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) 1630784719798, at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) 1630784719798, at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500) 1630784719798, at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) 1630784719798, at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) 1630784719798, at java.lang.Thread.run(Thread.java:748) 1630784719824,"2021-09-04 19:45:19,823 [ERROR] from .....Service$ in flm-actor-system-akka.actor.default-dispatcher-13416 - Facility Load Main Stream failed calling other streams shutdown gracefully.." 1630784719824,akka.stream.alpakka.kinesis.KinesisErrors$FailurePublishingRecords: Failure publishing records to Kinesis. Reason : software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP request: Acquire operation took longer than the configured maximum time. This indicates that a request cannot get a connection from the pool within the specified maximum time. This can be due to high request r ate. 1630784719824,"Consider taking any of the following actions to mitigate the issue: increase max connections, increase acquire timeout, or slowing the request rate." 1630784719824,"Increasing the max connections can increase client throughput (unless the network interface is already fully utilized), but can eventually start to hit operation system limitations on the number of file descriptors used by the process. If you already are fully utilizing your network interface or cannot further increase your connection count, increasing the acquire timeout gives extra time for requests to acquire a connection before timing out. If the connections doesn't free up, the subsequent requests will still timeout." 1630784719824,"If the above mechanisms are not able to fix the issue, try smoothing out your requests so that large traffic bursts cannot overload the client, being more efficient with the number of times you n eed to call AWS, or by increasing the number of hosts sending requests." 1630784719824,Caused by: java.util.concurrent.CompletionException: software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP request: Acquire operation took longer than the configured max imum time. This indicates that a request cannot get a connection from the pool within the specified maximum time. This can be due to high request rate. 1630784719824,"Consider taking any of the following actions to mitigate the issue: increase max connections, increase acquire timeout, or slowing the request rate." 1630784719824,"Increasing the max connections can increase client throughput (unless the network interface is already fully utilized), but can eventually start to hit operation system limitations on the number of file descriptors used by the process. If you already are fully utilizing your network interface or cannot further increase your connection count, increasing the acquire timeout gives extra time for requests to acquire a connection before timing out. If the connections doesn't free up, the subsequent requests will still timeout." 1630784719824,"If the above mechanisms are not able to fix the issue, try smoothing out your requests so that large traffic bursts cannot overload the client, being more efficient with the number of times you n eed to call AWS, or by increasing the number of hosts sending requests." 1630784719824, at software.amazon.awssdk.utils.CompletableFutureUtils.errorAsCompletionException(CompletableFutureUtils.java:60) 1630784719824, at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncExecutionFailureExceptionReportingStage.lambda$execute$0(AsyncExecutionFailureExceptionReportingStage.java:51) 1630784719824, at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) 1630784719824, at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797) 1630784719824, at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 1630784719824, at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) 1630784719824, at software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo$0(CompletableFutureUtils.java:74) 1630784719824, at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) 1630784719824, at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) 1630784719824, at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 1630784719824, at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) 1630784719824, at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeAttemptExecute(AsyncRetryableStage.java:85) 1630784719824, at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeRetryExecute(AsyncRetryableStage.java:144) 1630784719824, at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.lambda$attemptExecute$1(AsyncRetryableStage.java:125) 1630784719824, at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) 1630784719824, at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) 1630784719824, at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 1630784719824, at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) 1630784719824, at software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo$0(CompletableFutureUtils.java:74) 1630784719824, at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) 1630784719824, at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) 1630784719824, at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 1630784719824, at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) 1630784719824, at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$null$0(MakeAsyncHttpRequestStage.java:104) 1630784719824, at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) 1630784719824, at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) 1630784719824, at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 1630784719824, at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) 1630784719824, at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage$WrappedErrorForwardingResponseHandler.onError(MakeAsyncHttpRequestStage.java:158) 1630784719824, at software.amazon.awssdk.http.nio.netty.internal.NettyRequestExecutor.handleFailure(NettyRequestExecutor.java:299) 1630784719824, at software.amazon.awssdk.http.nio.netty.internal.NettyRequestExecutor.makeRequestListener(NettyRequestExecutor.java:171) 1630784719824, at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577) 1630784719824, at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:551) 1630784719824, at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490) 1630784719824, at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615) 1630784719824, at io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608) 1630784719824, at io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117) 1630784719824, at software.amazon.awssdk.http.nio.netty.internal.CancellableAcquireChannelPool.lambda$acquire$1(CancellableAcquireChannelPool.java:58) 1630784719824, at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577) 1630784719824, at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:551) 1630784719824, at io.netty.util.concurrent.DefaultPromise.access$200(DefaultPromise.java:35) 1630784719824, at io.netty.util.concurrent.DefaultPromise$1.run(DefaultPromise.java:501) 1630784719824, at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) 1630784719824, at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) 1630784719824, at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500) 1630784719824, at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) 1630784719824, at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) 1630784719824, at java.lang.Thread.run(Thread.java:748) 1630784719824,Caused by: software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP request: Acquire operation took longer than the configured maximum time. This indicates that a request cannot get a connection from the pool within the specified maximum time. This can be due to high request rate. 1630784719824,"Consider taking any of the following actions to mitigate the issue: increase max connections, increase acquire timeout, or slowing the request rate." 1630784719824,"Increasing the max connections can increase client throughput (unless the network interface is already fully utilized), but can eventually start to hit operation system limitations on the number of file descriptors used by the process. If you already are fully utilizing your network interface or cannot further increase your connection count, increasing the acquire timeout gives extra time for requests to acquire a connection before timing out. If the connections doesn't free up, the subsequent requests will still timeout." 1630784719824,"If the above mechanisms are not able to fix the issue, try smoothing out your requests so that large traffic bursts cannot overload the client, being more efficient with the number of times you need to call AWS, or by increasing the number of hosts sending requests." 1630784719824, at software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:98) 1630784719824, at software.amazon.awssdk.core.exception.SdkClientException.create(SdkClientException.java:43) 1630784719824, at software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:205) 1630784719824, at software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:201) 1630784719824, at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeRetryExecute(AsyncRetryableStage.java:143) 1630784719824, ... 35 common frames omitted 1630784719824,Caused by: java.lang.Throwable: Acquire operation took longer than the configured maximum time. This indicates that a request cannot get a connection from the pool within the specified maximum time. This can be due to high request rate. 1630784719824,"Consider taking any of the following actions to mitigate the issue: increase max connections, increase acquire timeout, or slowing the request rate." 1630784719824,"Increasing the max connections can increase client throughput (unless the network interface is already fully utilized), but can eventually start to hit operation system limitations on the number of file descriptors used by the process. If you already are fully utilizing your network interface or cannot further increase your connection count, increasing the acquire timeout gives extra time for requests to acquire a connection before timing out. If the connections doesn't free up, the subsequent requests will still timeout." 1630784719824,"If the above mechanisms are not able to fix the issue, try smoothing out your requests so that large traffic bursts cannot overload the client, being more efficient with the number of times you n1630784719824,"If the above mechanisms are not able to fix the issue, try smoothing out your requests so that large traffic bursts cannot overload the client, being more efficient with the number of times you need to call AWS, or by increasing the number of hosts sending requests." 1630784719824, at software.amazon.awssdk.http.nio.netty.internal.NettyRequestExecutor.decorateException(NettyRequestExecutor.java:305) 1630784719824, at software.amazon.awssdk.http.nio.netty.internal.NettyRequestExecutor.handleFailure(NettyRequestExecutor.java:298) 1630784719824, ... 18 common frames omitted 1630784719824,Caused by: java.util.concurrent.TimeoutException: Acquire operation took longer than 10000 milliseconds. 1630784719824, at software.amazon.awssdk.http.nio.netty.internal.HealthCheckedChannelPool.timeoutAcquire(HealthCheckedChannelPool.java:77) 1630784719824, at software.amazon.awssdk.http.nio.netty.internal.HealthCheckedChannelPool.lambda$acquire$0(HealthCheckedChannelPool.java:67) 1630784719824, at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98) 1630784719824, at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:170) 1630784719824, ... 6 common frames omitted 1630784719837,"2021-09-04 19:45:19,836 [ERROR] from akka.stream.Materializer in flm-actor-system-akka.actor.default-dispatcher-3909 - [FacilityLoadMainFlow logging] Upstream failed." 1630784719837,akka.stream.alpakka.kinesis.KinesisErrors$FailurePublishingRecords: Failure publishing records to Kinesis. Reason : software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP request: Acquire operation took longer than the configured maximum time. This indicates that a request cannot get a connection from the pool within the specified maximum time. This can be due to high request rate. 1630784719837,"Consider taking any of the following actions to mitigate the issue: increase max connections, increase acquire timeout, or slowing the request rate." 1630784719837,"Increasing the max connections can increase client throughput (unless the network interface is already fully utilized), but can eventually start to hit operation system limitations on the number of file descriptors used by the process. If you already are fully utilizing your network interface or cannot further increase your connection count, increasing the acquire timeout gives extra time for requests to acquire a connection before timing out. If the connections doesn't free up, the subsequent requests will still timeout." 1630784719837,"If the above mechanisms are not able to fix the issue, try smoothing out your requests so that large traffic bursts cannot overload the client, being more efficient with the number of times you need to call AWS, or by increasing the number of hosts sending requests." 1630784719837,Caused by: java.util.concurrent.CompletionException: software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP request: Acquire operation took longer than the configured maximum time. This indicates that a request cannot get a connection from the pool within the specified maximum time. This can be due to high request rate. 1630784719837,"Consider taking any of the following actions to mitigate the issue: increase max connections, increase acquire timeout, or slowing the request rate." 1630784719837,"Increasing the max connections can increase client throughput (unless the network interface is already fully utilized), but can eventually start to hit operation system limitations on the number of file descriptors used by the process. If you already are fully utilizing your network interface or cannot further increase your connection count, increasing the acquire timeout gives extra time for requests to acquire a connection before timing out. If the connections doesn't free up, the subsequent requests will still timeout." 1630784719837,"If the above mechanisms are not able to fix the issue, try smoothing out your requests so that large traffic bursts cannot overload the client, being more efficient with the number of times you need to call AWS, or by increasing the number of hosts sending requests." 1630784719837, at software.amazon.awssdk.utils.CompletableFutureUtils.errorAsCompletionException(CompletableFutureUtils.java:60) 1630784719837, at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncExecutionFailureExceptionReportingStage.lambda$execute$0(AsyncExecutionFailureExceptionReportingStage.java:51) 1630784719837, at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) 1630784719837, at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797) 1630784719837, at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 1630784719837, at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) 1630784719837, at software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo$0(CompletableFutureUtils.java:74) 1630784719837, at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) 1630784719837, at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) 1630784719837, at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 1630784719837, at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) 1630784719837, at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeAttemptExecute(AsyncRetryableStage.java:85) 1630784719837, at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeRetryExecute(AsyncRetryableStage.java:144) 1630784719837, at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.lambda$attemptExecute$1(AsyncRetryableStage.java:125) 1630784719837, at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) 1630784719837, at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) 1630784719837, at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 1630784719837, at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) 1630784719837, at software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo$0(CompletableFutureUtils.java:74) 1630784719837, at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) 1630784719837, at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) 1630784719837, at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 1630784719837, at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) 1630784719837, at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$null$0(MakeAsyncHttpRequestStage.java:104) 1630784719837, at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) 1630784719837, at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) 1630784719837, at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 1630784719837, at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) 1630784719837, at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage$WrappedErrorForwardingResponseHandler.onError(MakeAsyncHttpRequestStage.java:158) 1630784719837, at software.amazon.awssdk.http.nio.netty.internal.NettyRequestExecutor.handleFailure(NettyRequestExecutor.java:299) 1630784719837, at software.amazon.awssdk.http.nio.netty.internal.NettyRequestExecutor.makeRequestListener(NettyRequestExecutor.java:171) 1630784719837, at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577) 1630784719837, at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:551) 1630784719837, at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490) 1630784719837, at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615) 1630784719837, at io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608) 1630784719837, at io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117) 1630784719837, at software.amazon.awssdk.http.nio.netty.internal.CancellableAcquireChannelPool.lambda$acquire$1(CancellableAcquireChannelPool.java:58) 1630784719837, at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577) 1630784719837, at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:551) 1630784719837, at io.netty.util.concurrent.DefaultPromise.access$200(DefaultPromise.java:35) 1630784719837, at io.netty.util.concurrent.DefaultPromise$1.run(DefaultPromise.java:501) 1630784719837, at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) 1630784719837, at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) 1630784719837, at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500) 1630784719837, at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) 1630784719837, at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) 1630784719837, at java.lang.Thread.run(Thread.java:748) 1630784719837,Caused by: software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP request: Acquire operation took longer than the configured maximum time. This indicates that a request cannot get a connection from the pool within the specified maximum time. This can be due to high request rate. 1630784719837,"Consider taking any of the following actions to mitigate the issue: increase max connections, increase acquire timeout, or slowing the request rate." 1630784719837,"Increasing the max connections can increase client throughput (unless the network interface is already fully utilized), but can eventually start to hit operation system limitations on the number of file descriptors used by the process. If you already are fully utilizing your network interface or cannot further increase your connection count, increasing the acquire timeout gives extra time for requests to acquire a connection before timing out. If the connections doesn't free up, the subsequent requests will still timeout." 1630784719837,"If the above mechanisms are not able to fix the issue, try smoothing out your requests so that large traffic bursts cannot overload the client, being more efficient with the number of times you need to call AWS, or by increasing the number of hosts sending requests." 1630784719837, at software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:98) 1630784719837, at software.amazon.awssdk.core.exception.SdkClientException.create(SdkClientException.java:43) 1630784719837, at software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:205) 1630784719837, at software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:201) 1630784719837, at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeRetryExecute(AsyncRetryableStage.java:143) 1630784719837, ... 35 common frames omitted 1630784719837,Caused by: java.lang.Throwable: Acquire operation took longer than the configured maximum time. This indicates that a request cannot get a connection from the pool within the specified maximum time. This can be due to high request rate. 1630784719837,"Consider taking any of the following actions to mitigate the issue: increase max connections, increase acquire timeout, or slowing the request rate." 1630784719837,"Increasing the max connections can increase client throughput (unless the network interface is already fully utilized), but can eventually start to hit operation system limitations on the number of file descriptors used by the process. If you already are fully utilizing your network interface or cannot further increase your connection count, increasing the acquire timeout gives extra time for requests to acquire a connection before timing out. If the connections doesn't free up, the subsequent requests will still timeout." 1630784719837,"If the above mechanisms are not able to fix the issue, try smoothing out your requests so that large traffic bursts cannot overload the client, being more efficient with the number of times you need to call AWS, or by increasing the number of hosts sending requests." 1630784719837, at software.amazon.awssdk.http.nio.netty.internal.NettyRequestExecutor.decorateException(NettyRequestExecutor.java:305) 1630784719837, at software.amazon.awssdk.http.nio.netty.internal.NettyRequestExecutor.handleFailure(NettyRequestExecutor.java:298) 1630784719837, ... 18 common frames omitted 1630784719837,Caused by: java.util.concurrent.TimeoutException: Acquire operation took longer than 10000 milliseconds. 1630784719837, at software.amazon.awssdk.http.nio.netty.internal.HealthCheckedChannelPool.timeoutAcquire(HealthCheckedChannelPool.java:77) 1630784719837, at software.amazon.awssdk.http.nio.netty.internal.HealthCheckedChannelPool.lambda$acquire$0(HealthCheckedChannelPool.java:67) 1630784719837, at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98) 1630784719837, at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:170) 1630784719837, ... 6 common frames omitted

Reproducible Test Case

Please provide a PR with a failing test.

You can try to perfomance benchmark to create the race condition situation in buggy 10.1.12 akka-http library with the above given libraries. Hint: you can also try to read the code and pay attention to the below lines from Kinesis flow

kinesisClient .putRecords( PutRecordsRequest.builder().streamName(streamName).records(entries.map(_.1).asJavaCollection).build ) .toScala **.transform(handlePutRecordsSuccess(entries), FailurePublishingRecords())(sameThreadExecutionContext)**

If the issue is more complex or requires configuration, please provide a link to a project that reproduces the issue.

ergunbaris avatar Sep 07 '21 10:09 ergunbaris

Expected Behavior (Update)

KinesisFlow should return PutRecordsResultEntry with errorCode indicating failure type not allowing any side affects It would be better if the API was returning Try[PutRecordsResultEntry] as Alpakka DynamoDb implementations.

ergunbaris avatar Sep 09 '21 09:09 ergunbaris