Mutiny subscription is not cancelled when grpc client is cancelled
Describe the bug
Having a grpc service which returns Multi<T> and grpc client which is cancelled after some time, grpc service continue to produce items (looks similar to https://github.com/quarkusio/quarkus/issues/13988)
public Multi<GreetingResponse> greeting(Empty request) {
return Multi.createFrom().ticks().every(
Duration.ofSeconds(2)).invoke(x -> {
System.out.println(Thread.currentThread().getName() + "-" + Instant.now().getEpochSecond());
})
.onItem().transform(x -> GreetingResponse.newBuilder().setId(x).build());
}
Expected behavior
Grpc service is cancelled
Actual behavior
Grpc service continue to produce messages
How to Reproduce?
No response
Output of uname -a or ver
22.04.2-Ubuntu
Output of java -version
Temurin-21.0.2+13
Quarkus version or git rev
3.11.0
Build tool (ie. output of mvnw --version or gradlew --version)
Gradle 8.6
Additional information
No response
/cc @alesj (grpc), @cescoffier (grpc,mutiny), @jponge (mutiny)
Is there any plans to fix it ?
Yes, but cannot tell when I (or someone else) will have the time to look into it.
@alesj can you look into this one? I think we are missing an onTermination somewhere. Note that we have a recent change in Mutiny that may have fixed this one (so first we would need to reproduce it).
Issue is still present on 3.19.3
@tomaszklucha-stla any chance you could prepare a small reproducer for the issue?
Attached small project with issue. Please check and run unit test to reproduce.
@tomaszklucha-stla can you please create a public repository instead?
https://github.com/tomaszklucha-stla/quarkus-tick-bug
According to our investigations, it may not be issue with tick() itself but with grpc streaming not receiving cancellation.
I did see CANCELLATION and TERMINATION in the logs while running the tests:
diff --git a/src/main/java/grpc/example/HelloGrpcService.java b/src/main/java/grpc/example/HelloGrpcService.java
index 91ab995..401b7a3 100644
--- a/src/main/java/grpc/example/HelloGrpcService.java
+++ b/src/main/java/grpc/example/HelloGrpcService.java
@@ -21,10 +21,10 @@ public class HelloGrpcService implements HelloGrpc {
.build();
})
.onCancellation().invoke(() -> {
- System.out.print("CANCELLATION");
+ System.out.println("CANCELLATION");
})
.onTermination().invoke(() -> {
- System.out.print("TERMINATION");
+ System.out.println("TERMINATION");
});
}
You right, but in my case is after test finish not after I stop taking items from multi stream. Please check logs. You can see there that in test I take 5 items and cancel but ticking is still ongoing.
Time:15:33:57.353969860
Received: message: "Time:15:33:57.353969860"
Time:15:33:58.353524442
Received: message: "Time:15:33:58.353524442"
Time:15:33:59.354038892
Received: message: "Time:15:33:59.354038892"
Time:15:34:00.353863095
Received: message: "Time:15:34:00.353863095"
Time:15:34:01.353777316
Received: message: "Time:15:34:01.353777316"
Subscription cancelled
Completed
Time:15:34:02.354002346
Time:15:34:03.353886810
Time:15:34:04.353814766
Time:15:34:05.353869678
Time:15:34:06.353840938
Time:15:34:07.353716901
Time:15:34:08.353753525
Time:15:34:09.353724020
Time:15:34:10.353523808
Time:15:34:11.353820496
Time:15:34:12.353842915
Time:15:34:13.353733950
Time:15:34:14.353823803
Time:15:34:15.353657043
Time:15:34:16.353936319
@QuarkusTest
class HelloGrpcServiceTest {
@GrpcClient
HelloGrpc helloGrpc;
@Test
void testHello() throws InterruptedException {
Multi<HelloReply> sayHello = helloGrpc.sayHello(HelloRequest.newBuilder().setName("test").build());
sayHello.select().first(5)
.onTermination().invoke(() -> System.out.println("Subscription cancelled"))
.subscribe().with(
item -> System.out.println("Received: " + item),
failure -> System.err.println("Failed with: " + failure),
() -> System.out.println("Completed"));
Thread.sleep(Duration.ofSeconds(20));
}
}
From what I can see (/cc @alesj @cescoffier on our discussions in Brno) there is an unbounded demand on the server-side.
For others:
- I've checked the Mutiny implementation of the
select()operator, it does send a cancellation signal. - The root cause (but I'm not familiar with the gPRC layer) is that there's neither a cancellation nor a demand communication from the client to the server.
FYI, we tried Postman grpc client and effect was the same.
We have code like this in the stub:
handleSubscription(returnValue.subscribe().with(
response::onNext,
throwable -> onError(response, throwable),
() -> onCompleted(response)), response);
We even have:
items.request(Long.MAX_VALUE);
In some observer implementations, followed by:
// TODO We would need to implement back-pressure, but there is no relation between the number of items
// TODO from the source and the number of items from the service.
This does not handle back pressure correctly (requesting Long.MAX). So basically, we need to change the generated code with a more innovative way of doing it (chunk-based). The gRPC Stream Observer does not pass any back pressure signal, so we will have to be creative:
- Check if onNext is blocking or not (it could if TCP back pressure is bubbling up)
- Check if there is a way to now if the TCP socket has "capacity" (queue full signal) - but I'm not sure how to get this here.
I also found a more cryptic comment:
// TODO At some point we will need to improve the flow control with the max concurrent call per connection.
I need to go back to my very old notebooks to understand what I had in mind at that time.
@cescoffier As far as I can see, the classes including the TODO comments are used nowhere. Did you write them as a preparation for something?
I've created a PR which seems to solve the cancellation issue. Back pressure cannot be handled because the Emitter does not support this. Please have a look a it.
The problem seems to happen with and without Mutiny (I suppose the linked PR also suggests that), as long as VertX is used for grpc (quarkus.grpc.server.use-separate-server=false).
@alerosmile Do you know of any workaround for this issue? In our case, there is a long-running (endless) stream to the client and we have no way to tell if it's still listening.
@toniwe No, I'm not aware of any workaround. I'm still waiting for any progress here: https://github.com/eclipse-vertx/vert.x/issues/5562
With my small reproducer I've provided it should be clear that something has to be changed in Vert.x.