grpc-kotlin
grpc-kotlin copied to clipboard
RejectedExecutionException is thrown when there is burst of traffic
Hi, we have a gRPC server built with grpc-kotlin
library and kotlin coroutines. The service exposes a one method that underneath calls Redis database.
We decided to override default executor for gRPC server (the caching thread pool with unbounded size) with a fixed thread pool (16 cores + queue with max capacity = 1000).
During burst of traffic, and when the JVM instance is cold (newly started instance), we observe such errors:
Exception processing message
java.util.concurrent.RejectedExecutionException: Task io.grpc.internal.SerializingExecutor@3b22e3e0 rejected from java.util.concurrent.ThreadPoolExecutor@76d37f1e[Running, pool size = 16, active threads = 2, queued tasks = 1000, completed tasks = 126014]
at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2065)
at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:833)
at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1365)
at io.grpc.internal.SerializingExecutor.schedule(SerializingExecutor.java:102)
at io.grpc.internal.SerializingExecutor.execute(SerializingExecutor.java:95)
at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener.messagesAvailable(ServerImpl.java:841)
at io.grpc.internal.AbstractStream$TransportState.messagesAvailable(AbstractStream.java:183)
at io.grpc.internal.MessageDeframer.processBody(MessageDeframer.java:413)
at io.grpc.internal.MessageDeframer.deliver(MessageDeframer.java:276)
at io.grpc.internal.MessageDeframer.request(MessageDeframer.java:162)
at io.grpc.internal.AbstractStream$TransportState$1RequestRunnable.run(AbstractStream.java:233)
at io.grpc.netty.shaded.io.grpc.netty.NettyServerStream$TransportState$1.run(NettyServerStream.java:188)
at io.grpc.netty.shaded.io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
at io.grpc.netty.shaded.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
at io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:403)
at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.grpc.netty.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:833)
Exception in onHeadersRead()
```txt java.util.concurrent.RejectedExecutionException: Task io.grpc.internal.SerializingExecutor@62255b0f rejected from java.util.concurrent.ThreadPoolExecutor@76d37f1e[Running, pool size = 16, active threads = 1, queued tasks = 1000, completed tasks = 126145] at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2065) at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:833) at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1365) at io.grpc.internal.SerializingExecutor.schedule(SerializingExecutor.java:102) at io.grpc.internal.SerializingExecutor.execute(SerializingExecutor.java:95) at io.grpc.internal.ServerImpl$ServerTransportListenerImpl.streamCreatedInternal(ServerImpl.java:641) at io.grpc.internal.ServerImpl$ServerTransportListenerImpl.streamCreated(ServerImpl.java:467) at io.grpc.netty.shaded.io.grpc.netty.NettyServerHandler.onHeadersRead(NettyServerHandler.java:487) at io.grpc.netty.shaded.io.grpc.netty.NettyServerHandler.access$1000(NettyServerHandler.java:111) at io.grpc.netty.shaded.io.grpc.netty.NettyServerHandler$FrameListener.onHeadersRead(NettyServerHandler.java:856) at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onHeadersRead(DefaultHttp2ConnectionDecoder.java:409) at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onHeadersRead(DefaultHttp2ConnectionDecoder.java:337) at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2InboundFrameLogger$1.onHeadersRead(Http2InboundFrameLogger.java:56) at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader$2.processFragment(DefaultHttp2FrameReader.java:476) at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.readHeadersFrame(DefaultHttp2FrameReader.java:484) at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.processPayloadState(DefaultHttp2FrameReader.java:253) at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:159) at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2InboundFrameLogger.readFrame(Http2InboundFrameLogger.java:41) at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.decodeFrame(DefaultHttp2ConnectionDecoder.java:173) at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:393) at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:453) at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529) at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468) at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290) at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at io.grpc.netty.shaded.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800) at io.grpc.netty.shaded.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe$1.run(AbstractEpollChannel.java:425) at io.grpc.netty.shaded.io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174) at io.grpc.netty.shaded.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167) at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470) at io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:403) at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) at io.grpc.netty.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.base/java.lang.Thread.run(Thread.java:833) ```As far as I am concerned, the issue is mostly because of not enough resources to schedule execution of the requests. I wonder if it is possible to somehow "throttle" request processing? What do you suggest?
I think @lowasser will have some thoughts on this. It seems at least theoretically possible to have the request queue fill with idle connections just consuming a file handle when the request handler is waiting on getting resources to process the request. I think other servers do something similar, usually on top of Netty in JVM land. QQ: Are you using the Netty gRPC server? The one thing I do wonder about is how to manage things that aren't just a burst you can quickly recover from. Assuming you have a load balancer that is handling failures with a retry / fallback, you'd probably want your server to throw an error and close the connection when it is out of capacity.