grpc-java
grpc-java copied to clipboard
FutureStub cancelled in server call
What version of gRPC-Java are you using?
1.34.1
What is your environment?
Windows8 jdk1.8.0_202
I have two server ,and they both have same two grpc method(sayOne,sayTwo) , when server1 call the server2's sayOne method with FutureStub, and in server2's sayOne method,it call server1's sayTwo method with FutureStub, the StreamException happen. Server1 has the service
public static ManagedChannel channel = ManagedChannelBuilder.forTarget("192.168.10.1:50052")
.idleTimeout(60, TimeUnit.MINUTES).usePlaintext().build();
@Override
public void sayOne(HelloRequest req, StreamObserver<HelloReply> responseObserver) {
System.out.println("sayOnesayOnesayOnesayOnesayOnesayOnesayOnesayOnesayOnesayOne ==== ");
HelloRequest request = HelloRequest.newBuilder().setName(""+new Random().nextInt(100000000)).build();
GreeterFutureStub futureStub = GreeterGrpc.newFutureStub(channel);
futureStub.sayTwo(request);
HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
@Override
public void sayTwo(HelloRequest req, StreamObserver<HelloReply> responseObserver) {
System.out.println("sayTwosayTwosayTwosayTwosayTwosayTwosayTwosayTwosayTwosayTwo");
HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
Server2 has the service
public static ManagedChannel channel = ManagedChannelBuilder.forTarget("192.168.10.2:50051")
.idleTimeout(60, TimeUnit.MINUTES).usePlaintext().build();
@Override
public void sayOne(HelloRequest req, StreamObserver<HelloReply> responseObserver) {
System.out.println("sayOnesayOnesayOnesayOnesayOnesayOnesayOnesayOnesayOnesayOne ==== ");
HelloRequest request = HelloRequest.newBuilder().setName(""+new Random().nextInt(100000000)).build();
GreeterFutureStub futureStub = GreeterGrpc.newFutureStub(channel);
futureStub.sayTwo(request);
HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
@Override
public void sayTwo(HelloRequest req, StreamObserver<HelloReply> responseObserver) {
System.out.println("sayTwosayTwosayTwosayTwosayTwosayTwosayTwosayTwosayTwosayTwo");
HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
run the code in server 1
private void start() throws IOException {
/* The port on which the server should run */
int port = 50051;
server = ServerBuilder.forPort(port)
.addService(new GreeterImpl())
//.executor(GreeterImpl.threadPool)
.build().start();
System.out.println("Server started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
System.err.println("*** shutting down gRPC server since JVM is shutting down");
try{
HelloWorldServer2.this.stop();
}catch(InterruptedException e){
e.printStackTrace(System.err);
}
System.err.println("*** server shut down");
}
});
}
/**
* Main launches the server from the command line.
*/
public static void main(String[] args) throws IOException, InterruptedException {
final HelloWorldServer2 server = new HelloWorldServer2();
server.start();
new Thread(new Runnable() {
public void run() {
while(true){
HelloRequest request = HelloRequest.newBuilder().setName(""+new Random().nextInt(100000000)).build();
GreeterFutureStub futureStub = GreeterGrpc.newFutureStub(channel);
futureStub.sayOne(request);
try{
Thread.sleep(10L);
}catch(InterruptedException e){
e.printStackTrace();
}
System.out.println("11111111111");
}
}
}).start();
}
Then
警告: Stream Error
io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2Exception$StreamException: Received DATA frame for an unknown stream 183
at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2Exception.streamError(Http2Exception.java:147)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.shouldIgnoreHeadersOrDataFrame(DefaultHttp2ConnectionDecoder.java:596)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onDataRead(DefaultHttp2ConnectionDecoder.java:239)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2InboundFrameLogger$1.onDataRead(Http2InboundFrameLogger.java:48)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.readDataFrame(DefaultHttp2FrameReader.java:422)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.processPayloadState(DefaultHttp2FrameReader.java:251)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:160)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2InboundFrameLogger.readFrame(Http2InboundFrameLogger.java:41)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.decodeFrame(DefaultHttp2ConnectionDecoder.java:174)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:378)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:438)
at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:501)
at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:440)
at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.grpc.netty.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.grpc.netty.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:745)
The warning is a known issue as fixed in https://github.com/grpc/grpc-java/pull/7771 which is available from v1.36.0. You can ignore the warning in v1.34.1, it's totally safe and won't cause any trouble. If your application is still failing, it should be due to other issues, and this warning should not be the cause.
But when this warning happen,some rpc request won't execute. In this case,I use server1 call the server2's sayOne method with FutureStub 1000 times, then sayTwo in server1 only called back 800 times, if i change GreeterFutureStub to GreeterBlockingStub in sayOne , the issue disappear,sayTwo in server1 can called back 1000 times.
It will happen when you make an rpc call in an rpc server method after responseObserver.onCompleted(); like :
public void sayOne(HelloRequest req, StreamObserver responseObserver) {
HelloRequest request = HelloRequest.newBuilder().setName(""+new Random().nextInt(100000000)).build();
GreeterFutureStub futureStub = GreeterGrpc.newFutureStub(channel);
futureStub.sayTwo(request);
HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
or
public void sayOne(HelloRequest req, StreamObserver responseObserver) {
HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
HelloRequest request = HelloRequest.newBuilder().setName(""+new Random().nextInt(100000000)).build();
GreeterFutureStub futureStub = GreeterGrpc.newFutureStub(channel);
futureStub.sayTwo(request);
}
If you want to make sure futureStub.sayTwo(request) gets executed before the server shutdown, you need to call get() method of the returned Future of it.
Future<Response> future = futureStub.sayTwo(request)`
Call
future.get();
Otherwise, the future task might not yet finish or even start before the server quits.
But the server is not quit,I have add server.blockUntilShutdown() in the end of the main method. so you mean after responseObserver.onCompleted() called,the server will quit?
I don't know how you start server 2 (port 50051). Do you run another class's main method in another process?
I see there is an infinite loop in HelloworldServer2.main(), how does it stop at 1000 times?
In this case,I use server1 call the server2's sayOne method with FutureStub 1000 times, then sayTwo in server1 only called back 800 times,
I run another class's main method in another process. the test case code is : GreeterImpl.java
public class GreeterImpl extends GreeterGrpc.GreeterImplBase {
public static ManagedChannel channel = ManagedChannelBuilder.forTarget("192.168.16.147:50051").usePlaintext().build();
@Override
public void sayOne(HelloRequest req, StreamObserver<HelloReply> responseObserver) {
System.out.println("sayOnesayOnesayOnesayOnesayOnesayOnesayOnesayOnesayOnesayOne");
HelloRequest request = HelloRequest.newBuilder().setName(""+new Random().nextInt(100000000)).build();
GreeterFutureStub futureStub = GreeterGrpc.newFutureStub(channel);
futureStub.sayTwo(request);
HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
@Override
public void sayTwo(HelloRequest req, StreamObserver<HelloReply> responseObserver) {
System.out.println("sayTwosayTwosayTwosayTwosayTwosayTwosayTwosayTwosayTwosayTwo");
HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
}
HelloWorldServer1.java
public class HelloWorldServer1 {
private Server server;
private void blockUntilShutdown() throws InterruptedException {
if(server != null){
server.awaitTermination();
}
}
private void start() throws IOException {
int port = 50051;
server = ServerBuilder.forPort(port).addService(new GreeterImpl()).build().start();
System.out.println("Server started, listening on " + port);
}
public static void main(String[] args) throws IOException, InterruptedException {
HelloWorldServer1 server = new HelloWorldServer1();
server.start();
ManagedChannel channel = ManagedChannelBuilder.forTarget("192.168.16.147:50052").usePlaintext().build();
for(int i = 0;i<1000;i++){
HelloRequest request = HelloRequest.newBuilder().setName(""+new Random().nextInt(100000000)).build();
GreeterFutureStub futureStub = GreeterGrpc.newFutureStub(channel);
futureStub.sayOne(request);
try{
Thread.sleep(10L);
}catch(InterruptedException e){
e.printStackTrace();
}
}
server.blockUntilShutdown();
}
}
HelloWorldServer2.java
`public class HelloWorldServer2 {
private Server server;
private void start() throws IOException {
int port = 50052;
server = ServerBuilder.forPort(port).addService(new GreeterImpl()).build().start();
System.out.println("Server started, listening on " + port);
}
private void blockUntilShutdown() throws InterruptedException {
if(server != null){
server.awaitTermination();
}
}
public static void main(String[] args) throws IOException, InterruptedException {
final HelloWorldServer2 server = new HelloWorldServer2();
server.start();
server.blockUntilShutdown();
}
}`
run the main method in HelloworldServer2, then run the main method in HelloworldServer1, HelloworldServer2 console print "sayOnesayOnesayOnesayOnesayOnesayOnesayOnesayOnesayOnesayOne" 1000 times, but HelloworldServer1 console print "sayTwosayTwosayTwosayTwosayTwosayTwosayTwosayTwosayTwosayTwo" times much less than 1000. It looks like some "futureStub.sayTwo(request);" in GreeterImpl.sayOne is canceled.
some "futureStub.sayTwo(request);" in GreeterImpl.sayOne is canceled.
Yes, the future can be cancelled if the sayOne() server call is completed. You would have seen the stack trace if you had added a future listener to check the RPC status. In fact, every server stub method is called in a CancellableContext, and every client call inside a server stub method implementation inherits that context. When the server call completes, the context is cancelled, and the cancellation propagates to the client call that inherits the context. Sorry this mechanism was not well-documented, and need examples.
To avoid cancellation, you can
- call
.get()of the future before the server stub returns, but the downside is that it blocks;
or
- close the server response observer in future listener as follows:
@Override
public void sayOne(HelloRequest req, final StreamObserver<HelloReply> responseObserver) {
System.out.println("sayOnesayOnesayOnesayOnesayOnesayOnesayOnesayOnesayOnesayOne");
HelloRequest request = HelloRequest.newBuilder().setName(""+new Random().nextInt(100000000)).build();
GreeterFutureStub futureStub = GreeterGrpc.newFutureStub(channel);
final ListenableFuture<HelloReply> future = futureStub.sayTwo(request);
future.addListener(
new Runnable() {
@Override
public void run() {
try {
future.get();
responseObserver.onCompleted();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}
},
MoreExecutors.directExecutor()
);
HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build();
responseObserver.onNext(reply);
}
or
- fork the current context (then it will not propagate cancellation) and issue the client call as follows (This client call is then truly async from server call):
@Override
public void sayOne(HelloRequest req, StreamObserver<HelloReply> responseObserver) {
System.out.println("sayOnesayOnesayOnesayOnesayOnesayOnesayOnesayOnesayOnesayOne");
HelloRequest request = HelloRequest.newBuilder().setName(""+new Random().nextInt(100000000)).build();
GreeterFutureStub futureStub = GreeterGrpc.newFutureStub(channel);
Context fork = Context.current().fork();
Context current = fork.attach();
try {
final ListenableFuture<HelloReply> future = futureStub.sayTwo(request);
future.addListener(
new Runnable() {
@Override
public void run() {
try {
future.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}
},
MoreExecutors.directExecutor()
);
} finally {
fork.detach(current);
}
HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
mark