hyper
hyper copied to clipboard
Create a body channel implementation that knows when buf is written
It is very common to want a body type that allows for a push-based API, and knowledge of when those bytes have been written to the socket. We can provide a channel type that wraps the user's B: Buf in one that alerts the channel when Buf::advance has reached the end.
At work, we run into this issue, where we wanted to get callback on body chunks and then on end of streaming.
We implemented custom Stream wrappers for both cases and wrapped them with Body::wrap_stream. I think it suffers from additional heap allocations, as hyper boxes the stream you pass.
Either way, I'd be more than happy to tackle this. One thing I'd like to sort out first, is do we go with alerting via channel (if so, can you elaborate a bit more how you see it), or providing a callback the way we did.
Also, what should be sent by the channel/available in the callback. We did &Bytes so that we could call chunk.len().
I think there's 2 parts to figure out, the easier one is the Buf wrapper:
struct Alert<B: Buf> {
inner: B,
signal: SignalThingy,
}
impl<B: Buf> Buf for Alert {
fn advance(&mut self, cnt: usize) {
self.inner.advance(cnt);
if !self.inner.has_remaining() {
self.signal.thingy();
}
}
}
The harder part is what is the signal thingy? Is it like a tokio::sync::Notify, or similar kind of channel-looking thing?
Alright, there is a fundamental flaw in the bytes::Buf design that makes it impossible to implement this feature.
There is no method that will guarantee that the buffer was actually consumed. One is allowed to call Buf::chunk just for peaking, so the logic to trigger notification can't go there. We are not guaranteed that somebody will call buf.advance(0) after sending the whole buffer.
Relying on a call to Buf::remaining() that returns 0 might be possible, but still doesn't guarantee anything, as one could do:
let buf = Bytes::from_static(b"abc");
loop {
let chunk_size = buf.chunk().len();
send(buf.chunk());
if chunk_size == buf.remaining() {
break;
}
}
Notice that buf.remaining() is not guaranteed to return 0 (actually, it will only do so for empty buffer).
I could write such snippets for each of Buf methods and prove that none of them give us a definite EOS.
We would require something like:
trait ConsumingBuf {
fn next_chunk(&mut self) -> &[u8] {
let chunk = self.inner.chunk();
self.inner.advance(chunk.len());
if self.inner.remaining() == 0 {
self.signal.thingy()
}
}
}
(where inner implements Buf) to guarantee we notify if-and-only-if buffer was consumed.
It's true there's no guarantee, but hyper does make sure to always advance to the end. The thing with readiness-based IO is that we need to peek, because there might not be enough room in the socket's write buffer to send it all.