grpc-java icon indicating copy to clipboard operation
grpc-java copied to clipboard

Failed to release a message: UnpooledSlicedByteBuf(freed)

Open cdgjzzy opened this issue 1 year ago • 0 comments

gRPC-java===1.63.0 java: 17.0.9 2023-10-17 LTS 64-Bit

When I use gRPC streaming responses, I handle it like this:

observers.forEach((context, observer) -> {
    if (!context.isCancelled() && observer.isReady()) {
        observer.onNext(data);
    }
});

This method gets triggered about 1500 times per second. Under such concurrent circumstances, there are issues with gRPC failing to send messages.

Stacktrace and logs

2024-05-13 09:44:10.412 [grpc-nio-worker-ELG-1-7] WARN  io.grpc.netty.shaded.io.netty.util.ReferenceCountUtil.safeRelease - Failed to release a message: UnpooledSlicedByteBuf(freed)
io.grpc.netty.shaded.io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
	at io.grpc.netty.shaded.io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:83)
	at io.grpc.netty.shaded.io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:148)
	at io.grpc.netty.shaded.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:101)
	at io.grpc.netty.shaded.io.netty.buffer.CompositeByteBuf$Component.free(CompositeByteBuf.java:1959)
	at io.grpc.netty.shaded.io.netty.buffer.CompositeByteBuf.deallocate(CompositeByteBuf.java:2264)
	at io.grpc.netty.shaded.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:111)
	at io.grpc.netty.shaded.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:101)
	at io.grpc.netty.shaded.io.netty.buffer.AbstractDerivedByteBuf.release0(AbstractDerivedByteBuf.java:98)
	at io.grpc.netty.shaded.io.netty.buffer.AbstractDerivedByteBuf.release(AbstractDerivedByteBuf.java:94)
	at io.grpc.netty.shaded.io.netty.util.ReferenceCountUtil.release(ReferenceCountUtil.java:90)
	at io.grpc.netty.shaded.io.netty.util.ReferenceCountUtil.safeRelease(ReferenceCountUtil.java:116)
	at io.grpc.netty.shaded.io.netty.channel.ChannelOutboundBuffer.remove(ChannelOutboundBuffer.java:280)
	at io.grpc.netty.shaded.io.netty.channel.ChannelOutboundBuffer.removeBytes(ChannelOutboundBuffer.java:361)
	at io.grpc.netty.shaded.io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:438)
	at io.grpc.netty.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:931)
	at io.grpc.netty.shaded.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:354)
	at io.grpc.netty.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:895)
	at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1372)
	at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:921)
	at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:907)
	at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:893)
	at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler.flush(Http2ConnectionHandler.java:197)
	at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:925)
	at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:907)
	at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:893)
	at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:967)
	at io.grpc.netty.shaded.io.netty.channel.AbstractChannel.flush(AbstractChannel.java:254)
	at io.grpc.netty.shaded.io.grpc.netty.WriteQueue.flush(WriteQueue.java:143)
	at io.grpc.netty.shaded.io.grpc.netty.WriteQueue.access$000(WriteQueue.java:35)
	at io.grpc.netty.shaded.io.grpc.netty.WriteQueue$1.run(WriteQueue.java:47)
	at io.grpc.netty.shaded.io.netty.util.concurrent.AbstractEventExecutor.runTask$$$capture(AbstractEventExecutor.java:173)
	at io.grpc.netty.shaded.io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java)
	at io.grpc.netty.shaded.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
	at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
	at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
	at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.grpc.netty.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	
	
Client error:
On error,listening end
io.grpc.StatusRuntimeException: CANCELLED: Failed to read message.
	at io.grpc.Status.asRuntimeException(Status.java:533)
	at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:481)
	at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:564)
	at io.grpc.internal.ClientCallImpl.access$100(ClientCallImpl.java:72)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:729)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:710)
	at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: io.grpc.StatusRuntimeException: INTERNAL: Invalid protobuf byte sequence
	at io.grpc.Status.asRuntimeException(Status.java:525)
	at io.grpc.protobuf.lite.ProtoLiteUtils$MessageMarshaller.parse(ProtoLiteUtils.java:240)
	at io.grpc.protobuf.lite.ProtoLiteUtils$MessageMarshaller.parse(ProtoLiteUtils.java:134)
	at io.grpc.MethodDescriptor.parseResponse(MethodDescriptor.java:284)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInternal(ClientCallImpl.java:657)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInContext(ClientCallImpl.java:644)
	... 5 common frames omitted
Caused by: com.google.protobuf.InvalidProtocolBufferException: While parsing a protocol message, the input ended unexpectedly in the middle of a field.  This could mean either that the input has been truncated or that an embedded message misreported its own length.
	at com.google.protobuf.InvalidProtocolBufferException.truncatedMessage(InvalidProtocolBufferException.java:115)
	at com.google.protobuf.CodedInputStream$ArrayDecoder.readStringRequireUtf8(CodedInputStream.java:822)
	at com.google.protobuf.StringValue$Builder.mergeFrom(StringValue.java:386)
	at com.google.protobuf.StringValue$1.parsePartialFrom(StringValue.java:517)
	at com.google.protobuf.StringValue$1.parsePartialFrom(StringValue.java:509)
	at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:86)
	at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:48)
	at io.grpc.protobuf.lite.ProtoLiteUtils$MessageMarshaller.parseFrom(ProtoLiteUtils.java:245)
	at io.grpc.protobuf.lite.ProtoLiteUtils$MessageMarshaller.parse(ProtoLiteUtils.java:237)
	... 9 common frames omitted
	

If I control the sending frequency with a queue, then this exception does not occur.

cdgjzzy avatar May 13 '24 06:05 cdgjzzy