quarkus icon indicating copy to clipboard operation
quarkus copied to clipboard

Mutiny subscription is not cancelled when grpc client is cancelled

Open akastyka opened this issue 1 year ago • 18 comments

Describe the bug

Having a grpc service which returns Multi<T> and grpc client which is cancelled after some time, grpc service continue to produce items (looks similar to https://github.com/quarkusio/quarkus/issues/13988)

    public Multi<GreetingResponse> greeting(Empty request) {
        return Multi.createFrom().ticks().every(
                Duration.ofSeconds(2)).invoke(x -> {
            System.out.println(Thread.currentThread().getName() + "-" + Instant.now().getEpochSecond());
        })
                .onItem().transform(x -> GreetingResponse.newBuilder().setId(x).build());
    }

Expected behavior

Grpc service is cancelled

Actual behavior

Grpc service continue to produce messages

How to Reproduce?

No response

Output of uname -a or ver

22.04.2-Ubuntu

Output of java -version

Temurin-21.0.2+13

Quarkus version or git rev

3.11.0

Build tool (ie. output of mvnw --version or gradlew --version)

Gradle 8.6

Additional information

No response

akastyka avatar Jul 09 '24 08:07 akastyka

/cc @alesj (grpc), @cescoffier (grpc,mutiny), @jponge (mutiny)

quarkus-bot[bot] avatar Jul 09 '24 08:07 quarkus-bot[bot]

Is there any plans to fix it ?

akastyka avatar Aug 13 '24 14:08 akastyka

Yes, but cannot tell when I (or someone else) will have the time to look into it.

cescoffier avatar Aug 25 '24 11:08 cescoffier

@alesj can you look into this one? I think we are missing an onTermination somewhere. Note that we have a recent change in Mutiny that may have fixed this one (so first we would need to reproduce it).

cescoffier avatar Feb 05 '25 08:02 cescoffier

Issue is still present on 3.19.3

tomaszklucha-stla avatar Mar 13 '25 14:03 tomaszklucha-stla

@tomaszklucha-stla any chance you could prepare a small reproducer for the issue?

gsmet avatar Mar 13 '25 15:03 gsmet

tick-bug-project.zip

Attached small project with issue. Please check and run unit test to reproduce.

tomaszklucha-stla avatar Mar 19 '25 11:03 tomaszklucha-stla

@tomaszklucha-stla can you please create a public repository instead?

jponge avatar Mar 19 '25 13:03 jponge

https://github.com/tomaszklucha-stla/quarkus-tick-bug

According to our investigations, it may not be issue with tick() itself but with grpc streaming not receiving cancellation.

tomaszklucha-stla avatar Mar 19 '25 14:03 tomaszklucha-stla

I did see CANCELLATION and TERMINATION in the logs while running the tests:

diff --git a/src/main/java/grpc/example/HelloGrpcService.java b/src/main/java/grpc/example/HelloGrpcService.java
index 91ab995..401b7a3 100644
--- a/src/main/java/grpc/example/HelloGrpcService.java
+++ b/src/main/java/grpc/example/HelloGrpcService.java
@@ -21,10 +21,10 @@ public class HelloGrpcService implements HelloGrpc {
                             .build();
                 })
                 .onCancellation().invoke(() -> {
-                    System.out.print("CANCELLATION");
+                    System.out.println("CANCELLATION");
                 })
                 .onTermination().invoke(() -> {
-                    System.out.print("TERMINATION");
+                    System.out.println("TERMINATION");
                 });
     }

jponge avatar Mar 19 '25 14:03 jponge

You right, but in my case is after test finish not after I stop taking items from multi stream. Please check logs. You can see there that in test I take 5 items and cancel but ticking is still ongoing.

Time:15:33:57.353969860
Received: message: "Time:15:33:57.353969860"

Time:15:33:58.353524442
Received: message: "Time:15:33:58.353524442"

Time:15:33:59.354038892
Received: message: "Time:15:33:59.354038892"

Time:15:34:00.353863095
Received: message: "Time:15:34:00.353863095"

Time:15:34:01.353777316
Received: message: "Time:15:34:01.353777316"

Subscription cancelled
Completed
Time:15:34:02.354002346
Time:15:34:03.353886810
Time:15:34:04.353814766
Time:15:34:05.353869678
Time:15:34:06.353840938
Time:15:34:07.353716901
Time:15:34:08.353753525
Time:15:34:09.353724020
Time:15:34:10.353523808
Time:15:34:11.353820496
Time:15:34:12.353842915
Time:15:34:13.353733950
Time:15:34:14.353823803
Time:15:34:15.353657043
Time:15:34:16.353936319
@QuarkusTest
class HelloGrpcServiceTest {
    @GrpcClient
    HelloGrpc helloGrpc;

    @Test
    void testHello() throws InterruptedException {

        Multi<HelloReply> sayHello = helloGrpc.sayHello(HelloRequest.newBuilder().setName("test").build());

        sayHello.select().first(5)
                .onTermination().invoke(() -> System.out.println("Subscription cancelled"))
                .subscribe().with(
                        item -> System.out.println("Received: " + item),
                        failure -> System.err.println("Failed with: " + failure),
                        () -> System.out.println("Completed"));

        Thread.sleep(Duration.ofSeconds(20));
    }

}

tomaszklucha-stla avatar Mar 19 '25 14:03 tomaszklucha-stla

From what I can see (/cc @alesj @cescoffier on our discussions in Brno) there is an unbounded demand on the server-side.

jponge avatar Mar 19 '25 15:03 jponge

For others:

  • I've checked the Mutiny implementation of the select() operator, it does send a cancellation signal.
  • The root cause (but I'm not familiar with the gPRC layer) is that there's neither a cancellation nor a demand communication from the client to the server.

jponge avatar Mar 19 '25 15:03 jponge

FYI, we tried Postman grpc client and effect was the same.

tomaszklucha-stla avatar Mar 19 '25 15:03 tomaszklucha-stla

We have code like this in the stub:

handleSubscription(returnValue.subscribe().with(
                    response::onNext,
                    throwable -> onError(response, throwable),
                    () -> onCompleted(response)), response);

We even have:

                items.request(Long.MAX_VALUE);

In some observer implementations, followed by:

// TODO We would need to implement back-pressure, but there is no relation between the number of items
// TODO from the source and the number of items from the service.

This does not handle back pressure correctly (requesting Long.MAX). So basically, we need to change the generated code with a more innovative way of doing it (chunk-based). The gRPC Stream Observer does not pass any back pressure signal, so we will have to be creative:

  1. Check if onNext is blocking or not (it could if TCP back pressure is bubbling up)
  2. Check if there is a way to now if the TCP socket has "capacity" (queue full signal) - but I'm not sure how to get this here.

I also found a more cryptic comment:

 // TODO At some point we will need to improve the flow control with the max concurrent call per connection.

I need to go back to my very old notebooks to understand what I had in mind at that time.

cescoffier avatar Mar 20 '25 06:03 cescoffier

@cescoffier As far as I can see, the classes including the TODO comments are used nowhere. Did you write them as a preparation for something?

I've created a PR which seems to solve the cancellation issue. Back pressure cannot be handled because the Emitter does not support this. Please have a look a it.

alerosmile avatar Apr 25 '25 19:04 alerosmile

The problem seems to happen with and without Mutiny (I suppose the linked PR also suggests that), as long as VertX is used for grpc (quarkus.grpc.server.use-separate-server=false).

@alerosmile Do you know of any workaround for this issue? In our case, there is a long-running (endless) stream to the client and we have no way to tell if it's still listening.

toniwe avatar Jun 04 '25 13:06 toniwe

@toniwe No, I'm not aware of any workaround. I'm still waiting for any progress here: https://github.com/eclipse-vertx/vert.x/issues/5562

With my small reproducer I've provided it should be clear that something has to be changed in Vert.x.

alerosmile avatar Jun 04 '25 14:06 alerosmile