Allow clients to send data before headers are received
Is your feature request related to a problem? Please describe.
tonic wraps hyper to provide request/response and bidirectional streaming for GRPC over http/2. When using a bidirectional streaming client, tonic waits until it receives response headers to return control to the client. That means that straight line code like this can hang:
let mut client = EchoClient::connect("http://127.0.0.1:50051").await.unwrap();
let (tx, rx) = tokio::sync::mpsc::channel(10);
let response = client
.bidirectional_streaming_echo(tokio_stream::wrappers::ReceiverStream::new(rx))
.await
.unwrap();
for i in 0..10 {
tx.send(EchoRequest {
message: format!("msg {:02}", i),
})
.await
.unwrap();
}
let mut resp_stream = response.into_inner();
while let Some(received) = resp_stream.next().await {
let received = received.unwrap();
println!("\treceived message: `{}`", received.message);
}
Internally, the bidirectional_streaming_* call generates a hyper::client::conn::http2::SendRequest and awaits it (see tonic's SendRequest. Waiting on a SendRequest::send_request appears to send headers from the client and block until it has read headers from the server.
In the example above, because the body stream is still pending data, the client sends http/2 headers to the server with no body and waiting for response headers from the server. If the server waits on replying with headers until it receives body data - which GRPC servers in Java, Go, and C++ appear to do - the SendRequest::send_request future will never complete, control will never return to the caller so no data will be sent, and everything will hang.
Describe alternatives you've considered
h2 exposes an API in SendRequest that returns control as soon as headers have been sent - the second example in the documentation is exactly the behavior that tonic wants to have. In talking with @LucioFranco in Discord it seems like tonic has a few alternative ways to go about getting that kind of behavior:
- Expose an h2-style API in hyper, and switch tonic to that API.
- Drop down to using h2 directly in tonic clients.
- Write code in tonic to get back to this behavior without relying on the transport.
Based on our discussions so far, there's some preference for the first option and we thought we'd start a discussion here.
Thanks for the detailed write-up. I've now been able to read through and understand what you're describing.
Waiting on a SendRequest::send_request appears to send headers from the client and block until it has read headers from the server.
This is actually by design. That is it's purpose. The function returns a Future<Response>. So when you await it, it won't yield the response to the let response = until it has gotten one. Meanwhile, it will be trying to send data from the request body, if the request body yields some data.
What you would need to do here is either join() on that future and a second one sending request data, or spawn the response future and await it later (though you probably still want some sort of join, since you want to know if the server ever says "nope").
Consider a similar problem in a different domain. If you had a std::sync::mpsc channel, and some threads, you could accidentally do the same thing:
let (tx, rx) = std::sync::mpsc::channel(10);
let resp = thread::spawn(move || {
for msg in rx {
println!("got: {:?}", msg);
}
// return value
42
}).join();
tx.send(1);
tx.send(2);
// etc
Because of calling join() on the thread, wanting the result, but it won't complete until the channel sends its messages, then it will also hang.
This is actually by design. That is it's purpose.
Right! The ask here is to think about exposing another API more like h2's that gives the caller control over when to wait for headers. Using h2's SendRequest::send_request example:
let (response_fut, mut send_stream) = send_request
.send_request(request, false).unwrap();
send_stream.send_data(b"hello", false).unwrap();
send_stream.send_data(b"world", false).unwrap();
let response = response_fut.await.unwrap();
assert!(response.status(), StatusCode::OK);
Consider a similar problem in a different domain.
Your example isn't quite equivalent. The thread::spawn call doesn't force you to join, so you're free to do something like this:
let (tx, rx) = std::sync::mpsc::channel(10);
let resp = thread::spawn(move || {
for msg in rx {
println!("got: {:?}", msg);
}
// return value
42
});
tx.send(1);
tx.send(2);
// do more stuff
resp.join()
That kind of flexibility would be the goal with whatever this new API is.
The thread::spawn call doesn't force you to join
It's almost the same in the original example, though. You don't have to await, either. You probably do need the future to be polled, but it can be polled in a separate task, and then awaited later.
Like this:
let (tx, rx) = tokio::sync::mpsc::channel(10);
+ let future = client
- let response = client
.bidirectional_streaming_echo(tokio_stream::wrappers::ReceiverStream::new(rx));
- .await
- .unwrap();
+ let task = tokio::spawn(future);
for i in 0..10 {
tx.send(EchoRequest {
message: format!("msg {:02}", i),
})
.await
.unwrap();
}
- let mut resp_stream = response.into_inner();
+ let mut resp_stream = task.await.unwrap().into_inner();
while let Some(received) = resp_stream.next().await {
let received = received.unwrap();
println!("\treceived message: `{}`", received.message);
}
You probably do need the future to be polled, but it can be polled in a separate task, and then awaited later.
That definitely works, but it requires a deeper understanding of what's going on. In tonic, that violates expectations to the point that there's a four year old issue full of confused folks looking for HTTP/2 bugs (apologies for not linking that originally!).
Tonic could absolutely do something like that internally for streaming calls. The thought was that h2 already has this API - could we use it directly instead of having to spawn another task?
I just checked, and actually you don't need to spawn the response future. So, really, that API you're mentioning already exists. You can send the request, and then send on some sort of body sender, and then await the response. (One can also spawn it, that's fine too.)
@seanmonstar I think the bigger problem (and I took just a very quick look at poll_response in h2) is that it always waits for headers before returning control to get data frames. How does this work if there are no headers sent by the server and only data frames? Iirc this was one of the core issues with the code in h2 that @blinsay brought up to me.
How does this work if there are no headers sent by the server and only data frames?
It is not valid HTTP/2 for either side to send DATA before sending a HEADERS frame. But a gRPC server isn't doing that.
The problem, reading what is reported and what is in the linked issue, is calling .await for the response headers when the user doesn't want to wait for them.
@LucioFranco right, I think we assumed that was true based on the function signatures but @seanmonstar pointed out that assumption is not correct. Here's some working code that sends body data before awaiting headers. Playing with a toy http/2 echo server, I can confirm that all the body data gets sent before awaiting headers.
#[tokio::main]
async fn main() {
let stream = TcpStream::connect("127.0.0.1:8888").await.unwrap();
let exec = TokioExecutor;
let io = TokioIo::new(stream);
let (mut sender, conn) = http2::handshake(exec, io).await.unwrap();
tokio::spawn(async move {
conn.await.unwrap();
});
let (tx, rx) = tokio::sync::mpsc::channel::<Result<Frame<Bytes>, Infallible>>(1);
let req = Request::builder()
.method("POST")
.uri("/envoy.service.discovery.v3.AggregatedDiscoveryService/StreamAggregatedResources")
.header("content-type", "application/grpc")
.body(StreamBody::new(ReceiverStream::new(rx)))
.unwrap();
let headers_fut = sender.send_request(req);
// eprintln!("getting headers here will block forever");
// let resp = headers_fut.await;
// eprintln!("{resp:?}");
for i in 1..100 {
eprintln!("sending body data{i:02}");
let data = format!("data{i}");
tx.send(Ok(Frame::data(Bytes::copy_from_slice(data.as_bytes()))))
.await
.unwrap();
}
std::mem::drop(tx);
eprintln!("getting headers here returns fine");
let resp = headers_fut.await;
eprintln!("{resp:?}");
}
@seanmonstar thanks for pointing out that this works! I think we can close this. I'm also happy to submit a documentation PR about this behavior if you think it's useful.
For completeness, here's the h2 server I tested against: https://gist.github.com/blinsay/06a69bef4764b23fb45e1c19b51c7091