futures-rs
futures-rs copied to clipboard
The conversion between AsyncRead and Stream is asymmetric
As of futures-0.3, the conversion only goes one direction from Stream to AsyncRead through stream::TryStreamExt::into_async_read but not the other way around. Is this a conscious decision? If so why?
If not, then maybe a new io::AsyncReadExt::into_stream method is in order? A symmetric conversion between the two only makes sense to me.
What should to_stream yield? Vec<u8>s? It'd be pretty inefficient to allocate a new vector for every slice of incoming data. BytesMut would be a better option, but still significantly less performant than using the AsyncRead interface.
That would be terribly inefficient. But I suppose into_stream can take an extra parameter that is the buffer analogous to the AsyncRead::poll_read:
pub trait AsyncReadExt {
fn into_stream(self, buf: AsRef<[u8]>) -> IntoBytesStream<Self>;
}
impl<R: AsyncRead> Stream for IntoBytesStream<R> {
type Item = Result<AsRef<[u8]>, Error>;
}
I don't see how that would work-- you'd only be able to yield the user-provided buffer once. It's not possible to yield references into the buffer due to the "detatched" nature of Stream's items (http://smallcultfollowing.com/babysteps/blog/2019/12/10/async-interview-2-cramertj-part-2/ has more info on this).
It's possible to do this using an implementation of tokio_util::codec::Decoder (which presumably allocates a large buffer under the hood, and then passes you small slices from it?):
fn stream_for<R: AsyncRead + Send + Sized>(r: R) -> codec::FramedRead<R, IdentityCodec> {
codec::FramedRead::new(r, IdentityCodec)
}
struct IdentityCodec;
impl Decoder for IdentityCodec {
type Item = Bytes;
type Error = io::Error;
fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if buf.len() == 0 {
Ok(None)
} else {
Ok(Some(buf.split().freeze()))
}
}
}
But I guess that the relative downside of that when compared to what @cramertj is proposing is the reference counting in BytesMut vs being able to directly use references with this modified Stream type?
some function only accept Stream argument, but we got AsyncRead from somewhere, can someone tell me how to convert it convenient? thx
The examples on https://cloudflare-ipfs.com/ipfs/QmURzbuoZkGnnPuKpzQHVBe8xDfgR3tcH9f7SkAivBnnJJ/async_compression/stream/index.html probably cover what you need.
tokio has ReaderStream which uses BytesMut:
const CAPACITY: usize = 4096;
pin_project! {
pub struct ReaderStream<R> {
#[pin]
reader: Option<R>,
buf: BytesMut,
}
}
impl<R: AsyncRead> ReaderStream<R> {
pub fn new(reader: R) -> Self {
ReaderStream {
reader: Some(reader),
buf: BytesMut::new(),
}
}
}
impl<R: AsyncRead> Stream for ReaderStream<R> {
type Item = std::io::Result<Bytes>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
use crate::util::poll_read_buf;
let mut this = self.as_mut().project();
let reader = match this.reader.as_pin_mut() {
Some(r) => r,
None => return Poll::Ready(None),
};
if this.buf.capacity() == 0 {
this.buf.reserve(CAPACITY);
}
match poll_read_buf(reader, cx, &mut this.buf) {
Poll::Pending => Poll::Pending,
Poll::Ready(Err(err)) => {
self.project().reader.set(None);
Poll::Ready(Some(Err(err)))
}
Poll::Ready(Ok(0)) => {
self.project().reader.set(None);
Poll::Ready(None)
}
Poll::Ready(Ok(_)) => {
let chunk = this.buf.split();
Poll::Ready(Some(Ok(chunk.freeze())))
}
}
}
}
Hi, is there any plan to implement something like ReaderStream in futures? I'm willing to do the work.