quarkus icon indicating copy to clipboard operation
quarkus copied to clipboard

quarkus-grpc: OutOfMemoryException while streaming

Open sconvent opened this issue 9 months ago • 27 comments

Describe the bug

I'm using the quarkus-grpc extension to stream a large amount of objects from one service into another one. But on the client side I repeatedly run into OutOfMemoryExceptions.

On the server side a Multi is built like this:

Multi<DTOClass> multi = Multi.createFrom().emitter(emitter -> {
  databaseHandle.stream() // simplified, db returns a stream here
    .forEach(emitter::emit);
  emitter.complete();
})
.onRequest().invoke(requestSize -> System.out.println("Server requests size: " + requestSize));

And on the client side consumed like this:

grpcClient.getEntries() // returns a Multi
.onRequest().invoke(requestSize -> System.out.println("Client requests size: " + requestSize))
.collect().with(Collectors.counting()).await().atMost(Duration.ofSeconds(20));

Expected behavior

There is no OutOfMemoryException and the request size received on the server side is identical to the one on the client side.

Actual behavior

An OutOfMemoryException is thrown and the request size on the server is LONG_MAX .

Excerpt from the logs:

Client requests size: 256
...
Server requests size: 9223372036854775807

How to Reproduce?

  1. Implement a Spec + Server implementation that returns a stream with a large amount of entries
  2. Implement usage of client that consumes stream (e.g. in a testcase)
  3. Run client (set max. heap memory to a small but reasonable value to cause OutOfMemoryException)

Output of uname -a or ver

Linux 6.5.0-44-generic x86_64 GNU/Linux

Output of java -version

openjdk 21.0.6 2025-01-21

Quarkus version or git rev

3.15.1

Build tool (ie. output of mvnw --version or gradlew --version)

Apache Maven 3.9.9

Additional information

No response

sconvent avatar Feb 25 '25 09:02 sconvent

/cc @alesj (grpc), @cescoffier (grpc)

quarkus-bot[bot] avatar Feb 25 '25 09:02 quarkus-bot[bot]

Thanks a lot for reporting!

Is there any chance you can attach a sample that shows the problem in action? That would allow us to go straight into figuring out what the issue is.

geoand avatar Feb 25 '25 11:02 geoand

@geoand I created an example here: https://github.com/sconvent/quarkus-grpc-issue-demo

Please let me know if you have any further questions. I'm happy to help.

sconvent avatar Feb 26 '25 14:02 sconvent

Thanks a lot!

I was able to get a memory dump for the applcation using by making the following change:

diff --git a/pom.xml b/pom.xml
index 4895fe2..f6ad1be 100644
--- a/pom.xml
+++ b/pom.xml
@@ -10,9 +10,9 @@
 
     <properties>
         <quarkus.platform.artifact-id>quarkus-bom</quarkus.platform.artifact-id>
-        <quarkus.platform.group-id>io.quarkus.platform</quarkus.platform.group-id>
-        <quarkus.platform.version>3.15.1</quarkus.platform.version>
-        <surefire-plugin.version>3.1.2</surefire-plugin.version>
+        <quarkus.platform.group-id>io.quarkus</quarkus.platform.group-id>
+        <quarkus.platform.version>3.19.0</quarkus.platform.version>
+        <surefire-plugin.version>3.5.2</surefire-plugin.version>
         <compiler-plugin.version>3.11.0</compiler-plugin.version>
         <assertj.version>3.22.0</assertj.version>
 
@@ -113,7 +113,7 @@
                 <artifactId>maven-surefire-plugin</artifactId>
                 <version>${surefire-plugin.version}</version>
                 <configuration>
-                    <argLine>-Xmx512m</argLine>
+                    <argLine>-Xmx512m -XX:+HeapDumpOnOutOfMemoryError</argLine>
                     <systemPropertyVariables>
                         <java.util.logging.manager>org.jboss.logmanager.LogManager</java.util.logging.manager>
                     </systemPropertyVariables>

@jponge given what I see in the heap dump, I think this is one for you (although I can't tell if the Munity usage is correct or not, it may well be that the usage pattern where the emitter is called in a loop like it is guarantees an OOME)

geoand avatar Feb 26 '25 15:02 geoand

The call to .await() (and also .collect() to some extent) is the culprit, as this initiates a Long.MAX_VALUE demand (aka unbounded)

jponge avatar Feb 26 '25 15:02 jponge

Oh, I didn't see the test code at all...

geoand avatar Feb 26 '25 15:02 geoand

@sconvent is this representative of what you are doing in the actual application?

geoand avatar Feb 26 '25 15:02 geoand

Also note that the server code blasts through the Stream source and pushes items to the emitter without honoring any back-pressure. This can be checked with MultiEmitter::requested.

jponge avatar Feb 26 '25 15:02 jponge

Also note that the server code blasts through the Stream source and pushes items to the emitter without honoring any back-pressure

Right, that's what I had initially commented about

geoand avatar Feb 26 '25 15:02 geoand

This can be checked with MultiEmitter::requested

@jponge do you have any samples or tests showing how that would be used?

geoand avatar Feb 26 '25 18:02 geoand

You'd need to check requested() in a loop, but I confess the API here is not so great.

We'd need to add a callback for requests, just like I did in Mutiny Zero with whenRequested, see https://smallrye.io/smallrye-mutiny-zero/latest/quick-start/#creating-using-the-general-purpose-tube-api

jponge avatar Feb 26 '25 18:02 jponge

🙏

geoand avatar Feb 26 '25 18:02 geoand

I created an issue, this will land in Mutiny... soon 😄

jponge avatar Feb 26 '25 18:02 jponge

Thanks!

geoand avatar Feb 26 '25 18:02 geoand

I implemented the suggestions you made on a separate branch: https://github.com/sconvent/quarkus-grpc-issue-demo/tree/suggestions

Changes are:

  • The implementation no longer uses an emitter, but rather Multi.createFrom().items(...)
  • It no longer usese await() or collect() on the Multi on the client side but rather subscribe().with(...)

The issue still persists though. And now the requests size on both the client and server is Long.MAX_VALUE.

Am I still using something in a wrong way?

@sconvent is this representative of what you are doing in the actual application?

Yes. The main difference to the actual application is that there, the Stream on the server side comes from a database and the processing on the client side is not just a Thread.wait() but some other computation.

sconvent avatar Feb 27 '25 13:02 sconvent

subscribe().with(...) also does an unbounded request.

To properly control the demand I'm afraid you have to create your own subscriber, as in: https://github.com/smallrye/smallrye-mutiny/blob/main/workshop-examples/src/main/java/_01_basics/_14_Multi_Subscriber.java

(note that here it's an inefficient 1-by-1 request pattern, but you can certainly do some batching)

jponge avatar Feb 27 '25 13:02 jponge

I implemented the dedicated subscriber here: https://github.com/sconvent/quarkus-grpc-issue-demo/tree/subscriber It uses the 1-by-1 requests only with the addition of a delay in the onNext() method.

The issue is still present though. I can see a request size of 1 on the client side, but on the server side it still remains Long.MAX_VALUE. So it seems that the request size is not passed through. Not sure if it even should do that. But on the server side, there also seems to be an unbounded request. But that would be part of the quarkus-grpc implementation.

sconvent avatar Feb 28 '25 14:02 sconvent

We might chat about that with @alesj next week then

jponge avatar Feb 28 '25 14:02 jponge

        AtomicInteger counter = new AtomicInteger(0);

in BugDemoTest::bugDemoTest is never incremented :-)

alesj avatar Mar 03 '25 13:03 alesj

@alesj you're right, thanks. Fixed it just now. Didn't help with the issue though

sconvent avatar Mar 03 '25 18:03 sconvent

Didn't help with the issue though

Yeah, since it never came to last Assertion, OOM hits before.

Discussed it yday with @jponge, and we think this is not solvable ootb unfortunately ...

alesj avatar Mar 04 '25 08:03 alesj

here is what I think happens:

the reproducer shows

public Multi<HelloReply> sayHello(HelloRequest request) {
        Stream<HelloReply> inputs = IntStream.rangeClosed(0, COUNT - 1)
                .mapToObj(it -> HelloReply.newBuilder().setMessage(buildTestString(it)).build());
        Multi<HelloReply> multi = Multi.createFrom().emitter(multiEmitter -> {
            inputs.forEach(multiEmitter::emit);
            multiEmitter.complete();
        });
        return multi
                .emitOn(Infrastructure.getDefaultWorkerPool())
                .runSubscriptionOn(Infrastructure.getDefaultWorkerPool())
                .onRequest().invoke(requestSize -> System.out.println("Server request size: " + requestSize));
    }

There is some code involved here that subscribes to the returned Multi<HelloReply>, this code subscribes to the multi and write each item directly or indirectly to Vert.x GrpcServerResponse<HelloReply>.

This code seems to be requesting Long.MAX_VALUE and then write that.

Instead it should request N items and write them in the response and check against the response the writability with writeQueueFull() and stop requesting and eventually set a drain handler on the response. You can read more about Vert.x flow control here https://vertx.io/docs/vertx-core/java/#streams

Vert.x has some code for this in Vert.x Reactive Extensions, I am assuming that Mutiny does as well, here is the Vert.x code in RX that does the job : https://github.com/vert-x3/vertx-rx/blob/6ef24f0eff574b291a86be4570d967335518e84c/rx-java2-gen/src/main/java/io/vertx/reactivex/impl/WriteStreamSubscriberImpl.java#L36

The same should be done when the Multi is subscribed to write the items to the GrpcServerResponse.

vietj avatar Mar 06 '25 14:03 vietj

Hi Any update on this topic? We are facing the same issue. Any hint how to mitigate this? Best regards

doppelrittberger avatar Mar 20 '25 20:03 doppelrittberger

I have identified the places doing the outbound request; however, due to the nature of gRPC, we need to see how to provide a correct alternative. The observer passed by gRPC has no back pressure signal.

cescoffier avatar Mar 21 '25 07:03 cescoffier

Hi would you be interested in a MR that fixes the issue? I identified that checking isReady() and installing onReadyHandler() can solve the issue. Best regards

doppelrittberger avatar Apr 01 '25 18:04 doppelrittberger

@doppelrittberger yes if you have a PR it's always useful to make progress 👍

jponge avatar Apr 01 '25 19:04 jponge

I created a PR here: https://github.com/quarkusio/quarkus/pull/47149 It works for the given example. Not sure if the wait is implemented in the best way. I experimented with onReadyCallHandler() but I could not get it right. Please let me know what you think. Thanks

doppelrittberger avatar Apr 03 '25 11:04 doppelrittberger