tokio icon indicating copy to clipboard operation
tokio copied to clipboard

`tee` adaptor for `AsnycRead`

Open arifd opened this issue 2 years ago • 3 comments

Is your feature request related to a problem? Please describe. As bytes were being streamed through, I needed a way to feed a copy of the bytes into a hashing function.

Describe the solution you'd like What I ended up doing was basically implement this crate for AsyncRead

Here is the full code:

use futures::ready;
use std::{
    io::Result,
    pin::Pin,
    task::{Context, Poll},
};
use tokio::io::{AsyncRead, ReadBuf};

// an adapter that lets you peek/snoop on the data as it is being streamed
pub struct TeeReader<R: AsyncRead + Unpin, F: FnMut(&[u8])> {
    reader: R,
    f: F,
}

impl<R: AsyncRead + Unpin, F: FnMut(&[u8])> Unpin for TeeReader<R, F> {}

impl<R: AsyncRead + Unpin, F: FnMut(&[u8])> TeeReader<R, F> {
    /// Returns a TeeReader which can be used as AsyncRead whose
    /// reads forwards onto the supplied reader, but performs a supplied closure
    /// on the content of that buffer at every moment of the read
    pub fn new(reader: R, f: F) -> TeeReader<R, F> {
        TeeReader { reader: reader, f }
    }

    /// Consumes the `TeeReader`, returning the wrapped reader
    pub fn into_inner(self) -> R {
        self.reader
    }
}

impl<R: AsyncRead + Unpin, F: FnMut(&[u8])> AsyncRead for TeeReader<R, F> {
    fn poll_read(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<Result<()>> {
        ready!(Pin::new(&mut self.reader).poll_read(cx, buf))?;
        (self.f)(&buf.filled());
        Poll::Ready(Ok(()))
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use tokio::io::AsyncReadExt;

    #[test]
    fn tee() {
        let mut reader = "It's over 9000!".as_bytes();
        let mut altout: Vec<u8> = Vec::new();
        let mut teeout = Vec::new();
        {
            let mut tee = TeeReader::new(&mut reader, |bytes| altout.extend(bytes));
            let _ = tee.read_to_end(&mut teeout);
        }
        assert_eq!(teeout, altout);
    }
}

Would something like this be suitable in tokio_util or maybe futures_util?

arifd avatar Mar 27 '22 16:03 arifd

It could be reasonable to put something like it in tokio-util.

Darksonn avatar Mar 31 '22 12:03 Darksonn

Any objections to me tackling this? I think I see a couple of bugs in @arifd's example code, but I can fix them up as well as implement this for AsyncWrite.

farnz avatar Sep 19 '22 22:09 farnz

I've put together two commits in #5033 that provide wrappers for AsyncRead and AsyncWrite; the bug I thought I saw in the example code was there (if the reader doesn't empty the buffer between calls to poll_read, then data would get presented to the closure more than once, and read_to_end wasn't being .awaited in the test case).

farnz avatar Sep 20 '22 10:09 farnz

This can be closed now that #5033 has landed - it looks like I fouled up the linking of PR to issue, so it didn't happen automatically.

farnz avatar Sep 29 '22 09:09 farnz