warp icon indicating copy to clipboard operation
warp copied to clipboard

Allow more to be done with the result of warp::filters::body::stream()

Open jsdw opened this issue 4 years ago • 4 comments

Is your feature request related to a problem? Please describe.

I use/maintain a library (https://docs.rs/seamless/0.8.0/seamless/) which can be provided a stream of bytes to work with as part of a request body. This currently requires the Stream impl to have 'static + Send + Unpin.

The type returned from warp::filters::body::stream() is unnamed and is a impl Stream<Item = Result<impl Buf, crate::Error>>, which doesn't implement Send or Unpin and so cannot be handed off to this library.

Describe the solution you'd like

How about naming the type returned from warp::filters::body::stream(), as I think the underlying type (BodyStream) would automatically implement these things by default and so would be able to be handed off to other things and generally used with a little more flexibility.

Describe alternatives you've considered

Alternately, adding a few more bounds to the unnamed type (eg Send and Unpin) would still give it a little more flexibility, while exposing fewer implementation details.

Additional context

The current workaround is to avoid streaming altogether, and build a Vec<u8> to pass in instead, but I'd much prefer to be able to receive and hand off a proper Streaming body, so that I can better control how that stream is worked with and consumed in other places.

jsdw avatar Jun 04 '21 13:06 jsdw

Same here. I am working on something that forwards the content of the stream to another stream and this needs to be done in new thread (using tokio::spawn), which then requires Send. Adding Send or return impl Stream<Item = Result<Bytes, crate::Error>> would really help me a lot.

BillHuang2001 avatar Aug 01 '21 12:08 BillHuang2001

I would also like it if it returned impl Stream<Item = Result<Bytes, crate::Error>> + Send + Sync, as I also need to pass the stream with Bytes. I can do a downcast to Bytes since I know it has to be Bytes, but it feels really wrong, and I can't pass it to where I want anyways since it doesn't implement Send + Sync. IMO the filters::body() should be exposed publicly.

90-008 avatar Aug 27 '21 15:08 90-008

I am in a similar position of wanting to hand off the stream to something else. If I can't get the types to work with the (rust) library that would consume the stream, at a bare minimum I could get away with creating, say, a hyper client with a streaming body that would then just forward the request onto another service to deal with.

Does anyone have any working examples of taking a body stream and creating a forwarding HTTP request that consumes the stream without loading everything into memory?

The closest I have come (I think) is using hyper's Body::channel and using tokio::spawn to push chunks through the channel. There I struggled with the tx end of the Body::channel not satisfying Send and the compiler getting upset that it is used across and .await

malcolmstill avatar Jan 31 '22 17:01 malcolmstill

@malcolmstill I am having the same situation. I want to download files from S3 and decompress them before sending back to the user and I need to do it with streams. The main problem is that the response is not sent back until file is completely downloaded. Were you able to fix this somehow?

Here is a small example. I can see that TTFB is around 10 seconds.

pub async fn get_file() -> Result<warp::reply::Response, Infallible> {
    let (mut sender, body) = hyper::Body::channel();
    tokio::spawn(async move {
        for _ in 1..100 {
            sender
                .send_data(hyper::body::Bytes::from_static("Hello world!\n".as_bytes()))
                .await
                .unwrap();
            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
        }
    });

    Ok(hyper::Response::builder()
        .status(StatusCode::OK)
        .body(body)
        .unwrap())
}

Update: It seems that it is actually working correctly. Even though Chrome is telling that first byte was received after ten seconds in wireshark I can see that chunks were actually sent every 100ms. I will leave this comment with hope that it will help someone.

YaZasnyal avatar Jun 03 '22 19:06 YaZasnyal