quarkus
quarkus copied to clipboard
quarkus-grpc: OutOfMemoryException while streaming
Describe the bug
I'm using the quarkus-grpc extension to stream a large amount of objects from one service into another one.
But on the client side I repeatedly run into OutOfMemoryExceptions.
On the server side a Multi is built like this:
Multi<DTOClass> multi = Multi.createFrom().emitter(emitter -> {
databaseHandle.stream() // simplified, db returns a stream here
.forEach(emitter::emit);
emitter.complete();
})
.onRequest().invoke(requestSize -> System.out.println("Server requests size: " + requestSize));
And on the client side consumed like this:
grpcClient.getEntries() // returns a Multi
.onRequest().invoke(requestSize -> System.out.println("Client requests size: " + requestSize))
.collect().with(Collectors.counting()).await().atMost(Duration.ofSeconds(20));
Expected behavior
There is no OutOfMemoryException and the request size received on the server side is identical to the one on the client side.
Actual behavior
An OutOfMemoryException is thrown and the request size on the server is LONG_MAX .
Excerpt from the logs:
Client requests size: 256
...
Server requests size: 9223372036854775807
How to Reproduce?
- Implement a Spec + Server implementation that returns a stream with a large amount of entries
- Implement usage of client that consumes stream (e.g. in a testcase)
- Run client (set max. heap memory to a small but reasonable value to cause
OutOfMemoryException)
Output of uname -a or ver
Linux 6.5.0-44-generic x86_64 GNU/Linux
Output of java -version
openjdk 21.0.6 2025-01-21
Quarkus version or git rev
3.15.1
Build tool (ie. output of mvnw --version or gradlew --version)
Apache Maven 3.9.9
Additional information
No response
/cc @alesj (grpc), @cescoffier (grpc)
Thanks a lot for reporting!
Is there any chance you can attach a sample that shows the problem in action? That would allow us to go straight into figuring out what the issue is.
@geoand I created an example here: https://github.com/sconvent/quarkus-grpc-issue-demo
Please let me know if you have any further questions. I'm happy to help.
Thanks a lot!
I was able to get a memory dump for the applcation using by making the following change:
diff --git a/pom.xml b/pom.xml
index 4895fe2..f6ad1be 100644
--- a/pom.xml
+++ b/pom.xml
@@ -10,9 +10,9 @@
<properties>
<quarkus.platform.artifact-id>quarkus-bom</quarkus.platform.artifact-id>
- <quarkus.platform.group-id>io.quarkus.platform</quarkus.platform.group-id>
- <quarkus.platform.version>3.15.1</quarkus.platform.version>
- <surefire-plugin.version>3.1.2</surefire-plugin.version>
+ <quarkus.platform.group-id>io.quarkus</quarkus.platform.group-id>
+ <quarkus.platform.version>3.19.0</quarkus.platform.version>
+ <surefire-plugin.version>3.5.2</surefire-plugin.version>
<compiler-plugin.version>3.11.0</compiler-plugin.version>
<assertj.version>3.22.0</assertj.version>
@@ -113,7 +113,7 @@
<artifactId>maven-surefire-plugin</artifactId>
<version>${surefire-plugin.version}</version>
<configuration>
- <argLine>-Xmx512m</argLine>
+ <argLine>-Xmx512m -XX:+HeapDumpOnOutOfMemoryError</argLine>
<systemPropertyVariables>
<java.util.logging.manager>org.jboss.logmanager.LogManager</java.util.logging.manager>
</systemPropertyVariables>
@jponge given what I see in the heap dump, I think this is one for you (although I can't tell if the Munity usage is correct or not, it may well be that the usage pattern where the emitter is called in a loop like it is guarantees an OOME)
The call to .await() (and also .collect() to some extent) is the culprit, as this initiates a Long.MAX_VALUE demand (aka unbounded)
Oh, I didn't see the test code at all...
@sconvent is this representative of what you are doing in the actual application?
Also note that the server code blasts through the Stream source and pushes items to the emitter without honoring any back-pressure. This can be checked with MultiEmitter::requested.
Also note that the server code blasts through the Stream source and pushes items to the emitter without honoring any back-pressure
Right, that's what I had initially commented about
This can be checked with MultiEmitter::requested
@jponge do you have any samples or tests showing how that would be used?
You'd need to check requested() in a loop, but I confess the API here is not so great.
We'd need to add a callback for requests, just like I did in Mutiny Zero with whenRequested, see https://smallrye.io/smallrye-mutiny-zero/latest/quick-start/#creating-using-the-general-purpose-tube-api
🙏
I created an issue, this will land in Mutiny... soon 😄
Thanks!
I implemented the suggestions you made on a separate branch: https://github.com/sconvent/quarkus-grpc-issue-demo/tree/suggestions
Changes are:
- The implementation no longer uses an emitter, but rather
Multi.createFrom().items(...) - It no longer usese
await()orcollect()on the Multi on the client side but rathersubscribe().with(...)
The issue still persists though. And now the requests size on both the client and server is Long.MAX_VALUE.
Am I still using something in a wrong way?
@sconvent is this representative of what you are doing in the actual application?
Yes. The main difference to the actual application is that there, the Stream on the server side comes from a database and the processing on the client side is not just a Thread.wait() but some other computation.
subscribe().with(...) also does an unbounded request.
To properly control the demand I'm afraid you have to create your own subscriber, as in: https://github.com/smallrye/smallrye-mutiny/blob/main/workshop-examples/src/main/java/_01_basics/_14_Multi_Subscriber.java
(note that here it's an inefficient 1-by-1 request pattern, but you can certainly do some batching)
I implemented the dedicated subscriber here: https://github.com/sconvent/quarkus-grpc-issue-demo/tree/subscriber
It uses the 1-by-1 requests only with the addition of a delay in the onNext() method.
The issue is still present though. I can see a request size of 1 on the client side, but on the server side it still remains Long.MAX_VALUE. So it seems that the request size is not passed through. Not sure if it even should do that. But on the server side, there also seems to be an unbounded request. But that would be part of the quarkus-grpc implementation.
We might chat about that with @alesj next week then
AtomicInteger counter = new AtomicInteger(0);
in BugDemoTest::bugDemoTest is never incremented :-)
@alesj you're right, thanks. Fixed it just now. Didn't help with the issue though
Didn't help with the issue though
Yeah, since it never came to last Assertion, OOM hits before.
Discussed it yday with @jponge, and we think this is not solvable ootb unfortunately ...
here is what I think happens:
the reproducer shows
public Multi<HelloReply> sayHello(HelloRequest request) {
Stream<HelloReply> inputs = IntStream.rangeClosed(0, COUNT - 1)
.mapToObj(it -> HelloReply.newBuilder().setMessage(buildTestString(it)).build());
Multi<HelloReply> multi = Multi.createFrom().emitter(multiEmitter -> {
inputs.forEach(multiEmitter::emit);
multiEmitter.complete();
});
return multi
.emitOn(Infrastructure.getDefaultWorkerPool())
.runSubscriptionOn(Infrastructure.getDefaultWorkerPool())
.onRequest().invoke(requestSize -> System.out.println("Server request size: " + requestSize));
}
There is some code involved here that subscribes to the returned Multi<HelloReply>, this code subscribes to the multi and write each item directly or indirectly to Vert.x GrpcServerResponse<HelloReply>.
This code seems to be requesting Long.MAX_VALUE and then write that.
Instead it should request N items and write them in the response and check against the response the writability with writeQueueFull() and stop requesting and eventually set a drain handler on the response. You can read more about Vert.x flow control here https://vertx.io/docs/vertx-core/java/#streams
Vert.x has some code for this in Vert.x Reactive Extensions, I am assuming that Mutiny does as well, here is the Vert.x code in RX that does the job : https://github.com/vert-x3/vertx-rx/blob/6ef24f0eff574b291a86be4570d967335518e84c/rx-java2-gen/src/main/java/io/vertx/reactivex/impl/WriteStreamSubscriberImpl.java#L36
The same should be done when the Multi is subscribed to write the items to the GrpcServerResponse.
Hi Any update on this topic? We are facing the same issue. Any hint how to mitigate this? Best regards
I have identified the places doing the outbound request; however, due to the nature of gRPC, we need to see how to provide a correct alternative. The observer passed by gRPC has no back pressure signal.
Hi would you be interested in a MR that fixes the issue? I identified that checking isReady() and installing onReadyHandler() can solve the issue. Best regards
@doppelrittberger yes if you have a PR it's always useful to make progress 👍
I created a PR here: https://github.com/quarkusio/quarkus/pull/47149 It works for the given example. Not sure if the wait is implemented in the best way. I experimented with onReadyCallHandler() but I could not get it right. Please let me know what you think. Thanks