tokio
tokio copied to clipboard
`tee` adaptor for `AsnycRead`
Is your feature request related to a problem? Please describe. As bytes were being streamed through, I needed a way to feed a copy of the bytes into a hashing function.
Describe the solution you'd like
What I ended up doing was basically implement this crate for AsyncRead
Here is the full code:
use futures::ready;
use std::{
io::Result,
pin::Pin,
task::{Context, Poll},
};
use tokio::io::{AsyncRead, ReadBuf};
// an adapter that lets you peek/snoop on the data as it is being streamed
pub struct TeeReader<R: AsyncRead + Unpin, F: FnMut(&[u8])> {
reader: R,
f: F,
}
impl<R: AsyncRead + Unpin, F: FnMut(&[u8])> Unpin for TeeReader<R, F> {}
impl<R: AsyncRead + Unpin, F: FnMut(&[u8])> TeeReader<R, F> {
/// Returns a TeeReader which can be used as AsyncRead whose
/// reads forwards onto the supplied reader, but performs a supplied closure
/// on the content of that buffer at every moment of the read
pub fn new(reader: R, f: F) -> TeeReader<R, F> {
TeeReader { reader: reader, f }
}
/// Consumes the `TeeReader`, returning the wrapped reader
pub fn into_inner(self) -> R {
self.reader
}
}
impl<R: AsyncRead + Unpin, F: FnMut(&[u8])> AsyncRead for TeeReader<R, F> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<Result<()>> {
ready!(Pin::new(&mut self.reader).poll_read(cx, buf))?;
(self.f)(&buf.filled());
Poll::Ready(Ok(()))
}
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::io::AsyncReadExt;
#[test]
fn tee() {
let mut reader = "It's over 9000!".as_bytes();
let mut altout: Vec<u8> = Vec::new();
let mut teeout = Vec::new();
{
let mut tee = TeeReader::new(&mut reader, |bytes| altout.extend(bytes));
let _ = tee.read_to_end(&mut teeout);
}
assert_eq!(teeout, altout);
}
}
Would something like this be suitable in tokio_util
or maybe futures_util
?
It could be reasonable to put something like it in tokio-util.
Any objections to me tackling this? I think I see a couple of bugs in @arifd's example code, but I can fix them up as well as implement this for AsyncWrite
.
I've put together two commits in #5033 that provide wrappers for AsyncRead
and AsyncWrite
; the bug I thought I saw in the example code was there (if the reader doesn't empty the buffer between calls to poll_read
, then data would get presented to the closure more than once, and read_to_end
wasn't being .await
ed in the test case).
This can be closed now that #5033 has landed - it looks like I fouled up the linking of PR to issue, so it didn't happen automatically.