quarkus icon indicating copy to clipboard operation
quarkus copied to clipboard

Support `StreamingOutput` or document alternative in RESTEasy Reactive

Open knutwannheden opened this issue 2 years ago • 12 comments

Description

I have seen this issue discussed in both #26473 and #15891, but it still remains unclear to me how to do the equivalent of StreamingOutput with RESTEasy Reactive. If StreamingOutput cannot be directly supported as return type for blocking endpoints, I think it would make sense if the documentation provided some clear guidance on how to do this, including how to set headers (e.g. Content-Disposition as in the linked discussion), as RESTEasy Reactive is being promoted as the JAX-RS implementation of choice.

As in the discussion #26473 we also have a blocking endpoint which returns the output of some third-party library and having to first store that as a temporary file is IMO not very nice.

Implementation ideas

No response

knutwannheden avatar Oct 04 '22 15:10 knutwannheden

/cc @FroMage, @geoand, @stuartwdouglas

quarkus-bot[bot] avatar Oct 04 '22 15:10 quarkus-bot[bot]

The alternative is to use Multi. Now I do acknowledge that using Multi does not give you a good way to set headers... @FroMage WDYT?

geoand avatar Oct 05 '22 07:10 geoand

Would the return type then be Multi<byte[]>? I think a concrete example in the docs would be really helpful.

knutwannheden avatar Oct 05 '22 07:10 knutwannheden

Would the return type then be Multi<byte[]>

It really depends on what you are trying to do.

I think a concrete example in the docs would be really helpful.

I agree. We have various tests in the codebase, but documentation would definitely help. @FroMage would you like to add something?

geoand avatar Oct 05 '22 07:10 geoand

@knutwannheden I usually just return an inputstream for file downloads. Not sure if it helps in your situation though.

        return Response.ok(blob.getBinaryStream(), document.getMimeType())
                .header("Content-length", document.getFileSize())
                .header("content-disposition", "attachment; filename = " + document.getFileName())
                .build();

Postremus avatar Oct 05 '22 08:10 Postremus

Yeah, that is one way of doing this - if you don't care about blocking that is :)

geoand avatar Oct 05 '22 08:10 geoand

@knutwannheden I usually just return an inputstream for file downloads. Not sure if it helps in your situation though.

No, in our case we are dealing with a third-party library which writes to a given output stream. We then want to stream this output as the response. So with RESTEasy Classic the code looked something like this:

        StreamingOutput stream = os -> {
            Document document = new Document();
            PdfWriter.getInstance(document, os);
            document.open();
            generatePdf(document, id);
            document.close();
        };
        return Response.ok().entity(stream).build();

knutwannheden avatar Oct 05 '22 10:10 knutwannheden

Supporting streaming directly over the wire in a blocking way like StreamingOutput implies that a slow client IO will block a thread for as long as the bytes take to tranfer, locking up that thread which is an easy attack vector.

I can't think of any way to implement OutputStream without locking up a thread over IO. IMO the only solution is to either buffer to temp files, or in memory for smaller files, to decouple the task (CPU) from IO. Now, we could certainly add support for a type of OutputStream implementation that stores stuff in memory until a certain size, then delegates to a temp file if it exceeds that, and that is automatically deleted once the request is finished (this bit is probably missing ATM). This would then be sent by RESTEasy Reactive async and the user would have the impression that they streamed.

But honestly, I have a vague memory that @stuartwdouglas already implemented something like that somewhere, but I can't quite remember where.

FroMage avatar Oct 10 '22 14:10 FroMage

ResteasyReactiveOutputStream was it. But it looks to be a bit different, because it appears to support blocking. So… perhaps we actually support streaming while blocking, provided the endpoint is @Blocking and done in a worker thread.

If we do… from there to supporting StreamingOutput, it should be fairly simple. I would still prefer my suggested solution though, which would not lock a thread for IO.

FroMage avatar Oct 10 '22 14:10 FroMage

In our case having the endpoint as blocking is not an issue. But ResteasyReactiveOutputStream looks like it can only be directly used by interceptors, filters, etc.

I also looked into creating a temp file, but then I couldn't figure out how to get it deleted automatically after the request finishes processing.

Please note that I would be content with a complete example in the documentation, which shows how an endpoint can return a response based on an API returning an OutputStream. It doesn't have to be like StreamingOutput (even though that would be a nice option for blocking endpoints), but it would be nice if it were complete (e.g. also handle resource cleanup).

knutwannheden avatar Oct 10 '22 16:10 knutwannheden

IMHO there is no reason not to support this for @Blocking endpoints, we already have support for returning an InputStream which is just as blocking.

stuartwdouglas avatar Oct 11 '22 06:10 stuartwdouglas

I will have to do some more testing, but I think this may do the trick and would allow my endpoints to transparently use StreamingOutput:

@Provider
public class StreamingOutputMessageBodyWriter implements MessageBodyWriter<StreamingOutput> {

    @Override
    public boolean isWriteable(Class<?> aClass, Type type, Annotation[] annotations, MediaType mediaType) {
        return StreamingOutput.class.isAssignableFrom(aClass);
    }

    @Override
    public void writeTo(StreamingOutput blob, Class<?> aClass, Type type, Annotation[] annotations, MediaType mediaType, MultivaluedMap<String, Object> httpHeaders, OutputStream outputStream) throws IOException, WebApplicationException {
        blob.write(outputStream);
    }
}

knutwannheden avatar Oct 12 '22 17:10 knutwannheden

FWIW, the workaround with the custom MessageBodyWriter works as expected.

knutwannheden avatar Oct 20 '22 07:10 knutwannheden

This workaround is a lifesaver! Been troubleshoothing for hours why I got some weird result back from the StreamingOutput (when processing it showed the classname$1..... instead of the filecontent). Adding this StreamingOutputMessageBodyWriter does the trick.

Manfred73 avatar Nov 18 '22 15:11 Manfred73

@Manfred73 , @knutwannheden, how to actually use this class?

eggoyster avatar Jan 03 '23 17:01 eggoyster

All I did was adding this class to my codebase as @knutwannheden described and it automagically works.

Manfred73 avatar Jan 09 '23 10:01 Manfred73

I managed to get it working with the StreamingOutputMessageBodyWriter. I have a rest endpoint which returns a Uni<RestResponse<StreamingOutput>> but now I want to call that endpoint from another rest service and do some processing on the StreamingOutput. I've been following https://quarkus.io/guides/rest-client-reactive guide for that, but I'm not sure if this is the right way.

The service which returns the StreamingOutput:

@Path("/v1/files")
public class FileRepoQueryResource {
   @GET
   @Path("/{id}")
   @Produces(MediaType.APPLICATION_OCTET_STREAM)
   public Uni<RestResponse<StreamingOutput>> readFileById(@PathParam("id") String id) {
      // retrieve file by id and return as StreamingOutput
   }
}

Now I have another rest service which should retrieve the file by calling the above endpoint, so I have created a client interface and a resource implementation:

@Path("/v1/files")
@RegisterRestClient
public interface FileRepoQueryClient {
   @GET
   @Path("/{id}")
   @Produces(MediaType.APPLICATION_OCTET_STREAM)
   Uni<RestResponse<StreamingOutput>> readFileById(@PathParam("id") String id);
}
@Path("/v1/files")
public class FileResource {
   @Inject
   @RestClient
   FileRepoQueryClient fileRepoQueryClient;

   @GET
   @Path("/{id}")
   @Produces(MediaType.APPLICATION_OCTET_STREAM)
   public Uni<RestResponse<StreamingOutput>> readFileById(@PathParam("id") String id) {
      return fileRepoQueryClient.readFileById(id);
   }
}

And then a service which uses the FileResource.readFileById to get the StreamingOutput and count all lines starting with 2:

@AllArgsConstructor
@ApplicationScoped
public class FileProcessService {
   private final FileResource resource;

   // How to process the streaming output here and count all lines starting with 2?
   public Integer getNumberOfLinesStartingWith2(String id) {
      return service.readFileById(id)
         .map(so -> getNumberOfLinesStartingWith2(so, id))
         .subscribeAsCompletionStage()
         .getNow(0);
    }

   private Integer getNumberOfLinesStartingWith2(RestResponse<StreamingOutput> it, String id) {
      try (final var bf = new BufferedInputStream(it.readEntity(InputStream.class));
            final var br = new BufferedReader(new InputStreamReader(bf, StandardCharsets.UTF_8))) {
         return getNumberOfLinesStartingWith2(br); // reads each line, increases counter until end of bufferedreader
      } catch (IOException ioe) {
         throw new FileProcessingException("Error determining number of lines starting with 2 for file with id " + id, ioe);
      }
   }
}

When I just enter URL in the browser, the file is downloaded, but using the RestClient, I get below error:

"stackTrace":"java.util.concurrent.CompletionException: javax.ws.rs.ProcessingException: Response could not be mapped to type interface javax.ws.rs.core.StreamingOutput\n\tat java.base/java.util.concurrent.CompletableFuture.reportJoin(CompletableFuture.java:413)\n\tat java.base/java.util.concurrent.CompletableFuture.getNow(CompletableFuture.java:2134)\n\tat io.smallrye.context.CompletableFutureWrapper.getNow(CompletableFutureWrapper.java:175)\n\tat
..........
org.jboss.resteasy.reactive.common.core.AbstractResteasyReactiveContext.run(AbstractResteasyReactiveContext.java:145)\n\tat io.quarkus.vertx.core.runtime.VertxCoreRecorder$14.runWith(VertxCoreRecorder.java:576)\n\tat org.jboss.threads.EnhancedQueueExecutor$Task.run(EnhancedQueueExecutor.java:2449)\n\tat org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1478)\n\tat org.jboss.threads.DelegatingRunnable.run(DelegatingRunnable.java:29)\n\tat org.jboss.threads.ThreadLocalResettingRunnable.run(ThreadLocalResettingRunnable.java:29)\n\tat io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)\n\tat java.base/java.lang.Thread.run(Thread.java:833)\nCaused by: javax.ws.rs.ProcessingException: Response could not be mapped to type interface javax.ws.rs.core.StreamingOutput\n\tat org.jboss.resteasy.reactive.client.impl.ClientReaderInterceptorContextImpl.proceed(ClientReaderInterceptorContextImpl.java:75)\n\tat org.jboss.resteasy.reactive.client.impl.ClientSerialisers.invokeClientReader(ClientSerialisers.java:160)\n\tat org.jboss.resteasy.reactive.client.impl.RestClientRequestContext.readEntity(RestClientRequestContext.java:185)\n\tat org.jboss.resteasy.reactive.client.handlers.ClientResponseCompleteRestHandler.mapToResponse(ClientResponseCompleteRestHandler.java:104)\n\tat org.jboss.resteasy.reactive.client.handlers.ClientResponseCompleteRestHandler.handle(ClientResponseCompleteRestHandler.java:35)\n\tat org.jboss.resteasy.reactive.client.handlers.ClientResponseCompleteRestHandler.handle(ClientResponseCompleteRestHandler.java:31)\n\tat org.jboss.resteasy.reactive.common.core.AbstractResteasyReactiveContext.invokeHandler(AbstractResteasyReactiveContext.java:229)\n\tat org.jboss.resteasy.reactive.common.core.AbstractResteasyReactiveContext.run(AbstractResteasyReactiveContext.java:145)\n\tat org.jboss.resteasy.reactive.client.impl.RestClientRequestContext$1.lambda$execute$0(RestClientRequestContext.java:279)\n\tat io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:264)\n\tat io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:246)\n\tat io.vertx.core.impl.EventLoopContext.lambda$runOnContext$0(EventLoopContext.java:43)\n\tat io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)\n\tat io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)\n\tat io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)\n\tat io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:566)\n\tat io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)\n\tat io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)\n\t... 2 more\n","errorType":"java.util.concurrent.CompletionException","errorMessage":"javax.ws.rs.ProcessingException: Response could not be mapped to type interface javax.ws.rs.core.StreamingOutput"}

Manfred73 avatar Jan 18 '23 18:01 Manfred73

Just want to thank @knutwannheden for the awesome solution! Been struggling with this all morning and the new class worked like a charm 😍

HugoVinhal98 avatar Jan 24 '23 11:01 HugoVinhal98

I just looked at what StreamingOutput actually is... I never realized how simple it was... If I had, we would have added this a long time ago :)

@knutwannheden your solution seems correct (and FWIW, it's also what RESTEasy Classic) does, so would you like to contribute the fix (we would ideally have a few tests as well)? If you don't have the time, no worries, just let me know and I'll take care of it.

geoand avatar Feb 06 '23 14:02 geoand

I actually went ahead and did this myself as it also required updating the TCK.

geoand avatar Feb 07 '23 11:02 geoand

Cool, thanks!

knutwannheden avatar Feb 07 '23 11:02 knutwannheden

Hi folks, please correct me if I'm wrong but StreamingOutput can only be used with non-reactive upstream data sources because the output stream is closed after StreamOutput.write has returned which means one has to block within that method until the last byte is written - but blocking is not allowed.

Because returning Multi<Byte[]> currently does not support content negotiation / dynamic content type, there is no way to stream dynamic content in a reactive way. Are there plans to add support for it?

sven-geisenhainer avatar Sep 26 '23 14:09 sven-geisenhainer

@sven-geisenhainer there is a way to do what you are asking, see https://quarkus.io/guides/resteasy-reactive#streaming-support

geoand avatar Sep 27 '23 07:09 geoand

@geoand my case is a bit more complicated than just streaming bytes as shown in the documentation samples, I also need to retrieve some meta data like content-type from upstream (cloud) before streaming the actual content. So in fact I need to combine Uni and Multi. Consider the following test class:

public class StreamTestResource {
    private static class FileInfo {
        String name;
        String contentType;
        long size;

        public FileInfo(String name, String contentType, long size) {
            this.name = name;
            this.contentType = contentType;
            this.size = size;
        }
    }

    private final Uni<FileInfo> fileInfoSource  = Uni.createFrom().item(new FileInfo("test", "application/octet-stream", 100_000));
    private final Multi<byte[]> fileContentSource = Multi.createFrom().emitter(emitter -> {
        for (int i=0; i<100_000; ++i) emitter.emit(new byte[] {(byte)i});
        emitter.complete();
    });

    @GET
    public Multi<byte[]> streamDynamicContent() {
        return fileInfoSource
            .onItem()
            .transformToMulti(fileInfo ->
                RestMulti.fromMultiData(fileContentSource)
                .header("Content-Type", fileInfo.contentType)
                .header("Content-Length", Long.toString(fileInfo.size))
                .header("Content-Disposition", "attachment; filename=\"" + fileInfo.name + "\"")
                .build()
            );
    }
}

Requesting that endpoint gives the following exception:

java.lang.IllegalStateException: Negotiation or dynamic media type not supported yet for Multi: please use the @Produces annotation when returning a Multi

But I cannot use @Produces as I do not know the content-type at compile time. How to solve this?

sven-geisenhainer avatar Sep 27 '23 08:09 sven-geisenhainer

@sven-geisenhainer doesn't the example from the docs:

@Path("logs")
public class Endpoint {

    interface SomeService {
        Uni<SomeResponse> get();
    }

    interface SomeResponse {
        Multi<byte[]> data;

        String myHeader();
    }

    private final SomeService someService;

    public Endpoint(SomeService someService) {
        this.someService = someService;
    }

    @GET
    public Multi<String> streamLogs() {
        return RestMulti.fromUniResponse(someService.get(), SomeResponse::data, (r -> Map.of("MyHeader", List.of(r.myHeader()))));
    }
}

cover your use case?

geoand avatar Sep 27 '23 09:09 geoand

@geoand Unfortunately no because that example requires to know the content-type in advance in order to set a header map like Map.of("Content-Type", List.of(retrievedContentType)), but I need to obtain it from another reactive source, see my example above. What about that exception - are there plans to add the missing support?

sven-geisenhainer avatar Sep 27 '23 10:09 sven-geisenhainer

Isn't that the same exact case as the example?

It's getting a header obtained from a downstream service.

geoand avatar Sep 27 '23 10:09 geoand

@geoand Oh sorry, yes this perfectly works, didn't get the idea behind that code, thank you for persistently pointing me towards the right solution!

sven-geisenhainer avatar Sep 27 '23 12:09 sven-geisenhainer

Thanks for checking!

As you found it confusing, do you have any pointers of how the example could be improved to be clearer?

geoand avatar Sep 27 '23 12:09 geoand

@geoand I stumbled over that exception message "Negotiation or dynamic media type not supported yet for Multi: please use the @Produces annotation when returning a Multi" that actually is not correct as there is a working solution, so at least a better message would be helpful like "Use RestMulti.fromUniResponse for dynamic media type instead". The sample code is perfectly working, was my laziness to not carefully enough reading it. Thanks again!

sven-geisenhainer avatar Sep 27 '23 13:09 sven-geisenhainer