futures-rs icon indicating copy to clipboard operation
futures-rs copied to clipboard

The conversion between AsyncRead and Stream is asymmetric

Open edwardw opened this issue 5 years ago • 8 comments
trafficstars

As of futures-0.3, the conversion only goes one direction from Stream to AsyncRead through stream::TryStreamExt::into_async_read but not the other way around. Is this a conscious decision? If so why?

If not, then maybe a new io::AsyncReadExt::into_stream method is in order? A symmetric conversion between the two only makes sense to me.

edwardw avatar Dec 14 '19 17:12 edwardw

What should to_stream yield? Vec<u8>s? It'd be pretty inefficient to allocate a new vector for every slice of incoming data. BytesMut would be a better option, but still significantly less performant than using the AsyncRead interface.

cramertj avatar Dec 18 '19 00:12 cramertj

That would be terribly inefficient. But I suppose into_stream can take an extra parameter that is the buffer analogous to the AsyncRead::poll_read:

pub trait AsyncReadExt {
    fn into_stream(self, buf: AsRef<[u8]>) -> IntoBytesStream<Self>;
}

impl<R: AsyncRead> Stream for IntoBytesStream<R> {
    type Item = Result<AsRef<[u8]>, Error>;
}

edwardw avatar Dec 18 '19 08:12 edwardw

I don't see how that would work-- you'd only be able to yield the user-provided buffer once. It's not possible to yield references into the buffer due to the "detatched" nature of Stream's items (http://smallcultfollowing.com/babysteps/blog/2019/12/10/async-interview-2-cramertj-part-2/ has more info on this).

cramertj avatar Dec 18 '19 18:12 cramertj

It's possible to do this using an implementation of tokio_util::codec::Decoder (which presumably allocates a large buffer under the hood, and then passes you small slices from it?):

fn stream_for<R: AsyncRead + Send + Sized>(r: R) -> codec::FramedRead<R, IdentityCodec> {
    codec::FramedRead::new(r, IdentityCodec)
}

struct IdentityCodec;

impl Decoder for IdentityCodec {
    type Item = Bytes;
    type Error = io::Error;

    fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
        if buf.len() == 0 {
            Ok(None)
        } else {
            Ok(Some(buf.split().freeze()))
        }
    }
}

But I guess that the relative downside of that when compared to what @cramertj is proposing is the reference counting in BytesMut vs being able to directly use references with this modified Stream type?

stuhood avatar May 03 '20 04:05 stuhood

some function only accept Stream argument, but we got AsyncRead from somewhere, can someone tell me how to convert it convenient? thx

remones avatar Jan 13 '21 02:01 remones

The examples on https://cloudflare-ipfs.com/ipfs/QmURzbuoZkGnnPuKpzQHVBe8xDfgR3tcH9f7SkAivBnnJJ/async_compression/stream/index.html probably cover what you need.

Nemo157 avatar Jan 13 '21 08:01 Nemo157

tokio has ReaderStream which uses BytesMut:

const CAPACITY: usize = 4096;

pin_project! {
    pub struct ReaderStream<R> {
        #[pin]
        reader: Option<R>,
        buf: BytesMut,
    }
}

impl<R: AsyncRead> ReaderStream<R> {
    pub fn new(reader: R) -> Self {
        ReaderStream {
            reader: Some(reader),
            buf: BytesMut::new(),
        }
    }
}

impl<R: AsyncRead> Stream for ReaderStream<R> {
    type Item = std::io::Result<Bytes>;
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        use crate::util::poll_read_buf;

        let mut this = self.as_mut().project();

        let reader = match this.reader.as_pin_mut() {
            Some(r) => r,
            None => return Poll::Ready(None),
        };

        if this.buf.capacity() == 0 {
            this.buf.reserve(CAPACITY);
        }

        match poll_read_buf(reader, cx, &mut this.buf) {
            Poll::Pending => Poll::Pending,
            Poll::Ready(Err(err)) => {
                self.project().reader.set(None);
                Poll::Ready(Some(Err(err)))
            }
            Poll::Ready(Ok(0)) => {
                self.project().reader.set(None);
                Poll::Ready(None)
            }
            Poll::Ready(Ok(_)) => {
                let chunk = this.buf.split();
                Poll::Ready(Some(Ok(chunk.freeze())))
            }
        }
    }
}

ibraheemdev avatar May 08 '21 05:05 ibraheemdev

Hi, is there any plan to implement something like ReaderStream in futures? I'm willing to do the work.

Xuanwo avatar Jan 10 '22 07:01 Xuanwo