gotham
gotham copied to clipboard
Streaming response bodies
Is it possible to stream a response?
I tried using Body::pair()
but I was not able to get it working.
This is definitely something I'd like to see supported, but is not something that has received any attention yet. When you used Body::pair()
, did you see a type error, or something else?
Well I first tried this:
let mut response = Response::new();
set_headers(&state, &mut response, Some(::mime::TEXT_PLAIN), None);
let (sender, body) = Body::pair();
let stream = stream::iter_ok(vec![
Chunk::from("1\n"),
Chunk::from("2\n"),
Chunk::from("3\n")
].into_iter()).map(Ok);
response.set_body(body);
Box::new(
sender.send_all(stream)
.map_err(|e| e.into_handler_error())
.then(|x| match x {
Ok(_) => Ok((state, response)),
Err(e) => Err((state, e)),
})
)
But it just waits forever not returning anything. Then I tried this:
let mut response = Response::new();
set_headers(&state, &mut response, Some(::mime::TEXT_PLAIN), None);
let (sender, body) = Body::pair();
thread::spawn(|| {
let stream = stream::iter_ok(vec![
Chunk::from("1\n"),
Chunk::from("2\n"),
Chunk::from("3\n")
].into_iter()).map(Ok);
sender.send_all(stream).wait().unwrap()
});
response.set_body(body);
But that just returns immediately with no body.
Gotham currently forces the body of a response to be a hyper::Body
, if that was lifted you could set the body to any Stream<Item = Chunk>
and I think it would work.
Thanks. I'll have a look into this.
I think the problem in @kbillings's example is due to the ContentLength
header is being set to 0
by set_headers(&state, &mut response, Some(::mime::TEXT_PLAIN), None);
, preventing continued streaming of the response.
I just got streaming working with:
let mut res = Response::new();
set_headers(&state, &mut res, Some(ext_to_mime(&ext)), None);
res.headers_mut().set(CacheControl(vec![
CacheDirective::MaxAge(86400u32),
CacheDirective::Public,
]));
res.headers_mut().remove::<ContentLength>();
let (sender, body) = Body::pair();
res.set_body(body);
let stream = FS_POOL.read(pathbuf)
.map(|bytes| Ok(bytes.into()));
let sender = sender
.sink_map_err(|e| hyper::Error::from(::std::io::Error::new(::std::io::ErrorKind::Other, e)));
let streaming_future = sender.send_all(stream)
.map(|(_sink, _stream)| ())
.map_err(|_e| error!("Streaming error"));
// Pass streaming future to tokio
// TODO handle at_capacity error
DefaultExecutor::current().spawn(Box::new(streaming_future)).unwrap();
Box::new(future::ok((state, res)))
where FS_POOL is a futures_fs::FsPool
returning stream from a file.
Thanks @millardjn … That would explain what I saw when I looked at this briefly. Does this fix it for you @kbillings?
Thanks for this example code, @millardjn! Did you test it with larger files? I just wrote an integration of the futures-fs
crate for gotham and while streaming to responses works for small text files, it gets stuck with larger files after the FsReadStream
delivers two chunks.
I've also tried a different approach where I used a hyper::Response<Box<Stream<Item = Chunk, Error = hyper::Error>>>
which works from a Hyper perspective, but Gotham assumes handler futures to resolve to a Response<Body>
it seems and therefore I got compilation errors.
You can find my PoC here: https://github.com/ChristophWurst/gotham-middleware-fs/pull/1/files
Mostly image files up to 10-20 MB, nothing in the GB/TB range, but enough that I should be way past 2 chunks. All I can think of is that the future from the stream-sink interaction has to be handed to a tokio executor or a new thread so it keeps getting polled independently of the response future.
Good idea with the middleware. If you want to share the stalled code in a gist I'll take a look.
Mostly image files up to 10-20 MB, nothing in the GB/TB range, but enough that I should be way past 2 chunks.
My test file was a ~80 MB one. According to the printf statement I added for debugging purposes two chunks of 8192 bytes are read before it stalls.
All I can think of is that the future from the stream-sink interaction has to be handed to a tokio executor or a new thread so it keeps getting polled.
I'm returning the future which resolves when the read stream has been forwarded to the body stream. So I assumed that using this future to build the request handler future will eventually be passed to the event loop by Gotham and therefore will be polled.
If you want to share the stalled code in a gist I'll take a look.
See https://github.com/ChristophWurst/gotham-middleware-fs/pull/1/files and the newly added response_stream
example. Replace Cargo.toml
with a large file on your disk and start it with cargo run --example response_stream
.
I ran into this issue as well and it it's difficult to get the snippets to work without a Cargo.toml and imports, can somebody add this to the examples/ folder? :)