grpc-java icon indicating copy to clipboard operation
grpc-java copied to clipboard

RejectedExecution exception after call closed

Open YifeiZhuang opened this issue 3 years ago • 1 comments

Sep 14, 2022 11:55:46 PM io.netty.util.concurrent.DefaultPromise notifyListener0
WARNING: An exception was thrown by io.grpc.netty.NettyClientHandler$4.operationComplete()
java.util.concurrent.RejectedExecutionException
        at io.grpc.stub.ClientCalls$ThreadlessExecutor.execute(ClientCalls.java:790)
        at io.grpc.internal.RetriableStream$Sublistener.closed(RetriableStream.java:904)
        at io.grpc.internal.DelayedStream$DelayedStreamListener$4.run(DelayedStream.java:510)
        at io.grpc.internal.DelayedStream$DelayedStreamListener.delayOrExecute(DelayedStream.java:462)
        at io.grpc.internal.DelayedStream$DelayedStreamListener.closed(DelayedStream.java:507)
        at io.grpc.internal.ForwardingClientStreamListener.closed(ForwardingClientStreamListener.java:34)
        at io.grpc.internal.InternalSubchannel$CallTracingTransport$1$1.closed(InternalSubchannel.java:693)
        at io.grpc.internal.AbstractClientStream$TransportState.closeListener(AbstractClientStream.java:459)
        at io.grpc.internal.AbstractClientStream$TransportState.access$400(AbstractClientStream.java:221)
        at io.grpc.internal.AbstractClientStream$TransportState$1.run(AbstractClientStream.java:442)
        at io.grpc.internal.AbstractClientStream$TransportState.deframerClosed(AbstractClientStream.java:278)
        at io.grpc.internal.Http2ClientStreamTransportState.deframerClosed(Http2ClientStreamTransportState.java:31)
        at io.grpc.internal.MessageDeframer.close(MessageDeframer.java:233)
        at io.grpc.internal.AbstractStream$TransportState.closeDeframer(AbstractStream.java:198)
        at io.grpc.internal.AbstractClientStream$TransportState.transportReportStatus(AbstractClientStream.java:445)
        at io.grpc.netty.NettyClientHandler$4.operationComplete(NettyClientHandler.java:646)
        at io.grpc.netty.NettyClientHandler$4.operationComplete(NettyClientHandler.java:610)
        at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
        at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552)
        at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
        at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
        at io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:609)
        at io.netty.util.concurrent.DefaultPromise.setFailure(DefaultPromise.java:109)
        at io.netty.channel.DefaultChannelPromise.setFailure(DefaultChannelPromise.java:89)
        at io.netty.handler.codec.http2.StreamBufferingEncoder$Frame.release(StreamBufferingEncoder.java:327)
        at io.netty.handler.codec.http2.StreamBufferingEncoder$PendingStream.close(StreamBufferingEncoder.java:308)
        at io.netty.handler.codec.http2.StreamBufferingEncoder.cancelGoAwayStreams(StreamBufferingEncoder.java:274)
        at io.netty.handler.codec.http2.StreamBufferingEncoder.access$400(StreamBufferingEncoder.java:59)
        at io.netty.handler.codec.http2.StreamBufferingEncoder$1.onGoAwayReceived(StreamBufferingEncoder.java:138)
        at io.netty.handler.codec.http2.DefaultHttp2Connection.goAwayReceived(DefaultHttp2Connection.java:237)
        at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.onGoAwayRead0(DefaultHttp2ConnectionDecoder.java:217)
        at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onGoAwayRead(DefaultHttp2ConnectionDecoder.java:583)
        at io.netty.handler.codec.http2.Http2InboundFrameLogger$1.onGoAwayRead(Http2InboundFrameLogger.java:119)
        at io.netty.handler.codec.http2.DefaultHttp2FrameReader.readGoAwayFrame(DefaultHttp2FrameReader.java:580)
        at io.netty.handler.codec.http2.DefaultHttp2FrameReader.processPayloadState(DefaultHttp2FrameReader.java:271)
        at io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:159)
        at io.netty.handler.codec.http2.Http2InboundFrameLogger.readFrame(Http2InboundFrameLogger.java:41)
        at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.decodeFrame(DefaultHttp2ConnectionDecoder.java:173)
        at io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:378)
        at io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:438)
        at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507)
        at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:829)
Sep 14, 2022 11:55:46 PM io.grpc.internal.ManagedChannelImpl$2 uncaughtException
SEVERE: [Channel<1>: (localhost:8080)] Uncaught exception in the SynchronizationContext. Panic!
java.util.concurrent.RejectedExecutionException
        at io.grpc.stub.ClientCalls$ThreadlessExecutor.execute(ClientCalls.java:790)
        at io.grpc.internal.DelayedClientTransport.reprocess(DelayedClientTransport.java:311)
        at io.grpc.internal.ManagedChannelImpl.updateSubchannelPicker(ManagedChannelImpl.java:899)
        at io.grpc.internal.ManagedChannelImpl.access$5300(ManagedChannelImpl.java:118)
        at io.grpc.internal.ManagedChannelImpl$LbHelperImpl$1UpdateBalancingState.run(ManagedChannelImpl.java:1482)
        at io.grpc.SynchronizationContext.drain(SynchronizationContext.java:95)
        at io.grpc.SynchronizationContext.execute(SynchronizationContext.java:127)
        at io.grpc.internal.InternalSubchannel$TransportListener.transportReady(InternalSubchannel.java:547)
        at io.grpc.netty.ClientTransportLifecycleManager.notifyReady(ClientTransportLifecycleManager.java:44)
        at io.grpc.netty.NettyClientHandler$FrameListener.onSettingsRead(NettyClientHandler.java:920)
        at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onSettingsRead(DefaultHttp2ConnectionDecoder.java:515)
        at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$PrefaceFrameListener.onSettingsRead(DefaultHttp2ConnectionDecoder.java:735)
        at io.netty.handler.codec.http2.Http2InboundFrameLogger$1.onSettingsRead(Http2InboundFrameLogger.java:93)
        at io.netty.handler.codec.http2.DefaultHttp2FrameReader.readSettingsFrame(DefaultHttp2FrameReader.java:531)
        at io.netty.handler.codec.http2.DefaultHttp2FrameReader.processPayloadState(DefaultHttp2FrameReader.java:262)
        at io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:159)
        at io.netty.handler.codec.http2.Http2InboundFrameLogger.readFrame(Http2InboundFrameLogger.java:41)
        at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.decodeFrame(DefaultHttp2ConnectionDecoder.java:173)
        at io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:378)
        at io.netty.handler.codec.http2.Http2ConnectionHandler$PrefaceDecoder.decode(Http2ConnectionHandler.java:242)
        at io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:438)
        at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507)
        at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:829)

Exception in thread "main" io.grpc.StatusRuntimeException: INTERNAL: Panic! This is a bug!
        at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:271)
        at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:252)
        at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:165)
        at io.grpc.testing.integration.TestServiceGrpc$TestServiceBlockingStub.emptyCall(TestServiceGrpc.java:615)
        at io.grpc.testing.integration.AbstractInteropTest.emptyUnary(AbstractInteropTest.java:421)
        at io.grpc.testing.integration.TestServiceClient.runTest(TestServiceClient.java:264)
        at io.grpc.testing.integration.TestServiceClient.run(TestServiceClient.java:252)
        at io.grpc.testing.integration.TestServiceClient.main(TestServiceClient.java:66)
Caused by: java.util.concurrent.RejectedExecutionException
        at io.grpc.stub.ClientCalls$ThreadlessExecutor.execute(ClientCalls.java:790)
        at io.grpc.internal.DelayedClientTransport.reprocess(DelayedClientTransport.java:311)
        at io.grpc.internal.ManagedChannelImpl.updateSubchannelPicker(ManagedChannelImpl.java:899)
        at io.grpc.internal.ManagedChannelImpl.access$5300(ManagedChannelImpl.java:118)
        at io.grpc.internal.ManagedChannelImpl$LbHelperImpl$1UpdateBalancingState.run(ManagedChannelImpl.java:1482)
        at io.grpc.SynchronizationContext.drain(SynchronizationContext.java:95)
        at io.grpc.SynchronizationContext.execute(SynchronizationContext.java:127)
        at io.grpc.internal.InternalSubchannel$TransportListener.transportReady(InternalSubchannel.java:547)
        at io.grpc.netty.ClientTransportLifecycleManager.notifyReady(ClientTransportLifecycleManager.java:44)
        at io.grpc.netty.NettyClientHandler$FrameListener.onSettingsRead(NettyClientHandler.java:920)
        at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onSettingsRead(DefaultHttp2ConnectionDecoder.java:515)
        at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$PrefaceFrameListener.onSettingsRead(DefaultHttp2ConnectionDecoder.java:735)
        at io.netty.handler.codec.http2.Http2InboundFrameLogger$1.onSettingsRead(Http2InboundFrameLogger.java:93)
        at io.netty.handler.codec.http2.DefaultHttp2FrameReader.readSettingsFrame(DefaultHttp2FrameReader.java:531)
        at io.netty.handler.codec.http2.DefaultHttp2FrameReader.processPayloadState(DefaultHttp2FrameReader.java:262)
        at io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:159)
        at io.netty.handler.codec.http2.Http2InboundFrameLogger.readFrame(Http2InboundFrameLogger.java:41)
        at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.decodeFrame(DefaultHttp2ConnectionDecoder.java:173)
        at io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:378)
        at io.netty.handler.codec.http2.Http2ConnectionHandler$PrefaceDecoder.decode(Http2ConnectionHandler.java:242)
        at io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:438)
        at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507)
        at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:829)

YifeiZhuang avatar Sep 15 '22 00:09 YifeiZhuang

Background

ThreadlessExecutor wraps over client supplied executor, in order to manage the executor lifecycle. Application methods normally run on this executor. ClientCall injects it into CallOptions to pass it to transports, e.g. during call buffering or retry processing. The ThreadlessExecutor will queue the runnables and execute them sequentially. The moment that the transport notifies call is closed, it should be safe to shutdown the executor. If a subsystem still schedules runnables on the executor after the call is terminated, is not allowed. To obey the rule, subsystems, e.g. call buffering or retrying, may need to do internal synchronization in order to prevent itself from using the executor after the call is closed.

ManagedChannelImpl

https://github.com/grpc/grpc-java/pull/8978

Retriable Stream

Retriable stream intercepts the stream seen from ClientCall. It talks to the transport and may create multiple real streams until one of them receives successful response from the wire or it decides no more retry. Then it notifies up to the application of the result. Once RetrableStream returns to the application, it marks the end of the retry processing and the Retriable Stream should respect the life cycle related to the call. Failing to do so violates the contract mentioned above and may cause channel panic. In Retriable Stream, the calls on the stream from the application are saved and replayed for each of the retried sub streams. When a previous stream is closed in an error status code, a new sub stream may be created because of, e.g. transparent retry, or the retry strategy decides to. The replay tasks run on the executor supplied to the call, i.e. through CallOptions.

Problem

There is a race condition between cancel() to close the masterListener, and using callExecutor to drain the retry streams: CallOption.Executor is still used (in Sublistener.closed()) even after the application is notified of the close (retriableStream.cancel()). Once the close happens, in the Stub layer the ClientCall shutdowns the ThreadlessExecutor immediately and does not allow new runnables to be scheduled on the executor. However, Retriable Stream is lacking in the synchronization and still schedules drain tasks on the executor, thus RejectedExecution is thrown. Note that the underneath DelayedStream also uses the executor, so it needs to be considered as well, we will discuss this in the solution section.

Reproduction

Use interop tests setup can reproduce the bug. At the server, fail the RPC immediately (setting max connection age to 0). At the client, set a small(100ms) deadline. Run interop test empty call for 100 times, on a GCE VM. Reproduction rate 100%.

Solution: Use In flight retriable stream counter to synchronize call close and call executor

Use a new field atomic integer called InFlightSubStreams. A negative value of the counter indicates a cancellation has been committed. A positive value of the counter indicates there are outstanding new streams and it is not safe to be closed. In cancel(), add Integer.MIN_VALUE to the counter. When creating a new subStream, increment the counter first, if it is a positive number then cancel is not called so it is safe to create. Plus cancel() will not close the masterListener since the counter is positive. Decrement the counter when it is drained. (This might be wrong note that drained means the stream is closed, immediately call drain does not mean drained finished, because the other subsystems, e.g. delayedStream, might still be using the executor. Therefore, we do this check in listener closed()) Between cancel() and drain() the last stream, if cancel() happens first, then the counter equals to 0 in the last stream close(), notify masterListner.closed() there. If drain() happens first, then the counter equals Integer.MIN_VALUE in cancel(), notify the masterListner there.

Alternative Solution:

Use a new field SettableFuture called delayedClose. Delay notifying masterListener in cancel(). Use ListenerExecutor to synchronize between cancel() and creating a new stream. Blocking when the cancel() is committed, wait for the Future to be done. Creating a new sub stream should check whether the call is already closed. Create a new SettableFuture if it wins and set null value to it once drained.

cc. @ejona86

YifeiZhuang avatar Oct 14 '22 00:10 YifeiZhuang