quarkus
quarkus copied to clipboard
Support `StreamingOutput` or document alternative in RESTEasy Reactive
Description
I have seen this issue discussed in both #26473 and #15891, but it still remains unclear to me how to do the equivalent of StreamingOutput
with RESTEasy Reactive. If StreamingOutput
cannot be directly supported as return type for blocking endpoints, I think it would make sense if the documentation provided some clear guidance on how to do this, including how to set headers (e.g. Content-Disposition
as in the linked discussion), as RESTEasy Reactive is being promoted as the JAX-RS implementation of choice.
As in the discussion #26473 we also have a blocking endpoint which returns the output of some third-party library and having to first store that as a temporary file is IMO not very nice.
Implementation ideas
No response
/cc @FroMage, @geoand, @stuartwdouglas
The alternative is to use Multi
. Now I do acknowledge that using Multi does not give you a good way to set headers...
@FroMage WDYT?
Would the return type then be Multi<byte[]>
? I think a concrete example in the docs would be really helpful.
Would the return type then be Multi<byte[]>
It really depends on what you are trying to do.
I think a concrete example in the docs would be really helpful.
I agree. We have various tests in the codebase, but documentation would definitely help. @FroMage would you like to add something?
@knutwannheden I usually just return an inputstream for file downloads. Not sure if it helps in your situation though.
return Response.ok(blob.getBinaryStream(), document.getMimeType())
.header("Content-length", document.getFileSize())
.header("content-disposition", "attachment; filename = " + document.getFileName())
.build();
Yeah, that is one way of doing this - if you don't care about blocking that is :)
@knutwannheden I usually just return an inputstream for file downloads. Not sure if it helps in your situation though.
No, in our case we are dealing with a third-party library which writes to a given output stream. We then want to stream this output as the response. So with RESTEasy Classic the code looked something like this:
StreamingOutput stream = os -> {
Document document = new Document();
PdfWriter.getInstance(document, os);
document.open();
generatePdf(document, id);
document.close();
};
return Response.ok().entity(stream).build();
Supporting streaming directly over the wire in a blocking way like StreamingOutput
implies that a slow client IO will block a thread for as long as the bytes take to tranfer, locking up that thread which is an easy attack vector.
I can't think of any way to implement OutputStream
without locking up a thread over IO. IMO the only solution is to either buffer to temp files, or in memory for smaller files, to decouple the task (CPU) from IO. Now, we could certainly add support for a type of OutputStream
implementation that stores stuff in memory until a certain size, then delegates to a temp file if it exceeds that, and that is automatically deleted once the request is finished (this bit is probably missing ATM). This would then be sent by RESTEasy Reactive async and the user would have the impression that they streamed.
But honestly, I have a vague memory that @stuartwdouglas already implemented something like that somewhere, but I can't quite remember where.
ResteasyReactiveOutputStream
was it. But it looks to be a bit different, because it appears to support blocking. So… perhaps we actually support streaming while blocking, provided the endpoint is @Blocking
and done in a worker thread.
If we do… from there to supporting StreamingOutput
, it should be fairly simple. I would still prefer my suggested solution though, which would not lock a thread for IO.
In our case having the endpoint as blocking is not an issue. But ResteasyReactiveOutputStream
looks like it can only be directly used by interceptors, filters, etc.
I also looked into creating a temp file, but then I couldn't figure out how to get it deleted automatically after the request finishes processing.
Please note that I would be content with a complete example in the documentation, which shows how an endpoint can return a response based on an API returning an OutputStream
. It doesn't have to be like StreamingOutput
(even though that would be a nice option for blocking endpoints), but it would be nice if it were complete (e.g. also handle resource cleanup).
IMHO there is no reason not to support this for @Blocking endpoints, we already have support for returning an InputStream which is just as blocking.
I will have to do some more testing, but I think this may do the trick and would allow my endpoints to transparently use StreamingOutput
:
@Provider
public class StreamingOutputMessageBodyWriter implements MessageBodyWriter<StreamingOutput> {
@Override
public boolean isWriteable(Class<?> aClass, Type type, Annotation[] annotations, MediaType mediaType) {
return StreamingOutput.class.isAssignableFrom(aClass);
}
@Override
public void writeTo(StreamingOutput blob, Class<?> aClass, Type type, Annotation[] annotations, MediaType mediaType, MultivaluedMap<String, Object> httpHeaders, OutputStream outputStream) throws IOException, WebApplicationException {
blob.write(outputStream);
}
}
FWIW, the workaround with the custom MessageBodyWriter
works as expected.
This workaround is a lifesaver!
Been troubleshoothing for hours why I got some weird result back from the StreamingOutput (when processing it showed the classname$1..... instead of the filecontent).
Adding this StreamingOutputMessageBodyWriter
does the trick.
@Manfred73 , @knutwannheden, how to actually use this class?
All I did was adding this class to my codebase as @knutwannheden described and it automagically works.
I managed to get it working with the StreamingOutputMessageBodyWriter
. I have a rest endpoint which returns a
Uni<RestResponse<StreamingOutput>>
but now I want to call that endpoint from another rest service and do some processing on the StreamingOutput
. I've been following https://quarkus.io/guides/rest-client-reactive guide for that, but I'm not sure if this is the right way.
The service which returns the StreamingOutput
:
@Path("/v1/files")
public class FileRepoQueryResource {
@GET
@Path("/{id}")
@Produces(MediaType.APPLICATION_OCTET_STREAM)
public Uni<RestResponse<StreamingOutput>> readFileById(@PathParam("id") String id) {
// retrieve file by id and return as StreamingOutput
}
}
Now I have another rest service which should retrieve the file by calling the above endpoint, so I have created a client interface and a resource implementation:
@Path("/v1/files")
@RegisterRestClient
public interface FileRepoQueryClient {
@GET
@Path("/{id}")
@Produces(MediaType.APPLICATION_OCTET_STREAM)
Uni<RestResponse<StreamingOutput>> readFileById(@PathParam("id") String id);
}
@Path("/v1/files")
public class FileResource {
@Inject
@RestClient
FileRepoQueryClient fileRepoQueryClient;
@GET
@Path("/{id}")
@Produces(MediaType.APPLICATION_OCTET_STREAM)
public Uni<RestResponse<StreamingOutput>> readFileById(@PathParam("id") String id) {
return fileRepoQueryClient.readFileById(id);
}
}
And then a service which uses the FileResource.readFileById
to get the StreamingOutput
and count all lines starting with 2:
@AllArgsConstructor
@ApplicationScoped
public class FileProcessService {
private final FileResource resource;
// How to process the streaming output here and count all lines starting with 2?
public Integer getNumberOfLinesStartingWith2(String id) {
return service.readFileById(id)
.map(so -> getNumberOfLinesStartingWith2(so, id))
.subscribeAsCompletionStage()
.getNow(0);
}
private Integer getNumberOfLinesStartingWith2(RestResponse<StreamingOutput> it, String id) {
try (final var bf = new BufferedInputStream(it.readEntity(InputStream.class));
final var br = new BufferedReader(new InputStreamReader(bf, StandardCharsets.UTF_8))) {
return getNumberOfLinesStartingWith2(br); // reads each line, increases counter until end of bufferedreader
} catch (IOException ioe) {
throw new FileProcessingException("Error determining number of lines starting with 2 for file with id " + id, ioe);
}
}
}
When I just enter URL in the browser, the file is downloaded, but using the RestClient, I get below error:
"stackTrace":"java.util.concurrent.CompletionException: javax.ws.rs.ProcessingException: Response could not be mapped to type interface javax.ws.rs.core.StreamingOutput\n\tat java.base/java.util.concurrent.CompletableFuture.reportJoin(CompletableFuture.java:413)\n\tat java.base/java.util.concurrent.CompletableFuture.getNow(CompletableFuture.java:2134)\n\tat io.smallrye.context.CompletableFutureWrapper.getNow(CompletableFutureWrapper.java:175)\n\tat
..........
org.jboss.resteasy.reactive.common.core.AbstractResteasyReactiveContext.run(AbstractResteasyReactiveContext.java:145)\n\tat io.quarkus.vertx.core.runtime.VertxCoreRecorder$14.runWith(VertxCoreRecorder.java:576)\n\tat org.jboss.threads.EnhancedQueueExecutor$Task.run(EnhancedQueueExecutor.java:2449)\n\tat org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1478)\n\tat org.jboss.threads.DelegatingRunnable.run(DelegatingRunnable.java:29)\n\tat org.jboss.threads.ThreadLocalResettingRunnable.run(ThreadLocalResettingRunnable.java:29)\n\tat io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)\n\tat java.base/java.lang.Thread.run(Thread.java:833)\nCaused by: javax.ws.rs.ProcessingException: Response could not be mapped to type interface javax.ws.rs.core.StreamingOutput\n\tat org.jboss.resteasy.reactive.client.impl.ClientReaderInterceptorContextImpl.proceed(ClientReaderInterceptorContextImpl.java:75)\n\tat org.jboss.resteasy.reactive.client.impl.ClientSerialisers.invokeClientReader(ClientSerialisers.java:160)\n\tat org.jboss.resteasy.reactive.client.impl.RestClientRequestContext.readEntity(RestClientRequestContext.java:185)\n\tat org.jboss.resteasy.reactive.client.handlers.ClientResponseCompleteRestHandler.mapToResponse(ClientResponseCompleteRestHandler.java:104)\n\tat org.jboss.resteasy.reactive.client.handlers.ClientResponseCompleteRestHandler.handle(ClientResponseCompleteRestHandler.java:35)\n\tat org.jboss.resteasy.reactive.client.handlers.ClientResponseCompleteRestHandler.handle(ClientResponseCompleteRestHandler.java:31)\n\tat org.jboss.resteasy.reactive.common.core.AbstractResteasyReactiveContext.invokeHandler(AbstractResteasyReactiveContext.java:229)\n\tat org.jboss.resteasy.reactive.common.core.AbstractResteasyReactiveContext.run(AbstractResteasyReactiveContext.java:145)\n\tat org.jboss.resteasy.reactive.client.impl.RestClientRequestContext$1.lambda$execute$0(RestClientRequestContext.java:279)\n\tat io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:264)\n\tat io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:246)\n\tat io.vertx.core.impl.EventLoopContext.lambda$runOnContext$0(EventLoopContext.java:43)\n\tat io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)\n\tat io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)\n\tat io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)\n\tat io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:566)\n\tat io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)\n\tat io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)\n\t... 2 more\n","errorType":"java.util.concurrent.CompletionException","errorMessage":"javax.ws.rs.ProcessingException: Response could not be mapped to type interface javax.ws.rs.core.StreamingOutput"}
Just want to thank @knutwannheden for the awesome solution! Been struggling with this all morning and the new class worked like a charm 😍
I just looked at what StreamingOutput
actually is... I never realized how simple it was... If I had, we would have added this a long time ago :)
@knutwannheden your solution seems correct (and FWIW, it's also what RESTEasy Classic) does, so would you like to contribute the fix (we would ideally have a few tests as well)? If you don't have the time, no worries, just let me know and I'll take care of it.
I actually went ahead and did this myself as it also required updating the TCK.
Cool, thanks!
Hi folks, please correct me if I'm wrong but StreamingOutput can only be used with non-reactive upstream data sources because the output stream is closed after StreamOutput.write has returned which means one has to block within that method until the last byte is written - but blocking is not allowed.
Because returning Multi<Byte[]> currently does not support content negotiation / dynamic content type, there is no way to stream dynamic content in a reactive way. Are there plans to add support for it?
@sven-geisenhainer there is a way to do what you are asking, see https://quarkus.io/guides/resteasy-reactive#streaming-support
@geoand my case is a bit more complicated than just streaming bytes as shown in the documentation samples, I also need to retrieve some meta data like content-type from upstream (cloud) before streaming the actual content. So in fact I need to combine Uni and Multi. Consider the following test class:
public class StreamTestResource {
private static class FileInfo {
String name;
String contentType;
long size;
public FileInfo(String name, String contentType, long size) {
this.name = name;
this.contentType = contentType;
this.size = size;
}
}
private final Uni<FileInfo> fileInfoSource = Uni.createFrom().item(new FileInfo("test", "application/octet-stream", 100_000));
private final Multi<byte[]> fileContentSource = Multi.createFrom().emitter(emitter -> {
for (int i=0; i<100_000; ++i) emitter.emit(new byte[] {(byte)i});
emitter.complete();
});
@GET
public Multi<byte[]> streamDynamicContent() {
return fileInfoSource
.onItem()
.transformToMulti(fileInfo ->
RestMulti.fromMultiData(fileContentSource)
.header("Content-Type", fileInfo.contentType)
.header("Content-Length", Long.toString(fileInfo.size))
.header("Content-Disposition", "attachment; filename=\"" + fileInfo.name + "\"")
.build()
);
}
}
Requesting that endpoint gives the following exception:
java.lang.IllegalStateException: Negotiation or dynamic media type not supported yet for Multi: please use the @Produces annotation when returning a Multi
But I cannot use @Produces
as I do not know the content-type at compile time. How to solve this?
@sven-geisenhainer doesn't the example from the docs:
@Path("logs")
public class Endpoint {
interface SomeService {
Uni<SomeResponse> get();
}
interface SomeResponse {
Multi<byte[]> data;
String myHeader();
}
private final SomeService someService;
public Endpoint(SomeService someService) {
this.someService = someService;
}
@GET
public Multi<String> streamLogs() {
return RestMulti.fromUniResponse(someService.get(), SomeResponse::data, (r -> Map.of("MyHeader", List.of(r.myHeader()))));
}
}
cover your use case?
@geoand Unfortunately no because that example requires to know the content-type in advance in order to set a header map like Map.of("Content-Type", List.of(retrievedContentType))
, but I need to obtain it from another reactive source, see my example above. What about that exception - are there plans to add the missing support?
Isn't that the same exact case as the example?
It's getting a header obtained from a downstream service.
@geoand Oh sorry, yes this perfectly works, didn't get the idea behind that code, thank you for persistently pointing me towards the right solution!
Thanks for checking!
As you found it confusing, do you have any pointers of how the example could be improved to be clearer?
@geoand I stumbled over that exception message "Negotiation or dynamic media type not supported yet for Multi: please use the @Produces annotation when returning a Multi" that actually is not correct as there is a working solution, so at least a better message would be helpful like "Use RestMulti.fromUniResponse for dynamic media type instead". The sample code is perfectly working, was my laziness to not carefully enough reading it. Thanks again!