Bytes no longer implements Extend<u8>
This means that in particular you can't use StreamExt::concat on a stream of Bytes. Is that change intentional or something that got lost in the rewrites?
I think it was accidental. @seanmonstar ?
Partly intentional, partly just punted. It was intentionally removed in the pass that removed all mutation of a Bytes. Should we just add in a simple Extend<u8> for Bytes?
Another idea is to encourage using some aggregate utility instead, which would aggregate into some BufList.
If Bytes doesn't implement Extend<u8>, then code like this in the implementation of warp will no longer work:
body.try_concat().await
where body is a hyper Body, which is a Stream that produces Bytes. try_concat relies on the output of the stream being Extend<u8> for this to still compile.
Unless you can think of a suitable replacement for that use of try_concat?
body.try_concat().await led me here too.
without this i have-
let mut response = request.await?.into_body();
let mut v = Vec::default();
while let Some(bytes_result) = response.next().await {
let bytes = bytes_result?;
v.extend(bytes)
}
or maybe
let bytes = response_body
.try_fold(Vec::default(), |mut v, bytes| async {
v.extend(bytes);
Ok(v)
})
.await?;
or worst of all,
let mut response = request.await?.into_body();
let chunks: Vec<Vec<u8>> = response.collect().await?;
let bytes: Vec<u8> = chunks.into_iter().flatten().collect();
none of which i love
you could implement it directly on hyper::Body?
/// Returns a future of the concatenated raw bytes from the [`Body`](Body).
///
/// # Example
/// ```
/// # async {
/// # let body = hyper::Body::empty()
/// let bytes = body.concat().await?;
/// # Ok(())
/// # };
/// ```
#[cfg(feature = "stream")]
pub async fn concat(mut self) -> crate::Result<Bytes> {
let mut buf = BytesMut::new();
while let Some(bytes) = self.next().await {
buf.extend(bytes?)
}
Ok(buf.freeze())
}
Trying to update my code and I am hitting this bug:
https://github.com/cetra3/mpart-async/blob/541a2e7680d5a0f77f73940b651b408ead720f26/src/filestream.rs#L59
The workaround I had was as follows:
let bytes = FileStream::new("Cargo.toml").try_fold(BytesMut::new(), |mut buf, bytes| {
buf.extend_from_slice(&bytes);
future::ready(Ok(buf))
})
.await?;
I can submit a PR for this if needed? I think it makes things a lot more ergonomic if we can use try_concat():
let bytes = FileStream::new("Cargo.toml")
.try_concat()
.await?;
Tokio will be providing a version of collect that supports Bytes. It is on master but not released yet.
~~PSA: https://docs.rs/tokio/0.2.22/tokio/stream/trait.FromStream.html is released.~~
Tokio 0.3 removed this impl again.
tokio 0.3 removed FromStream impl for bytes types... (https://github.com/tokio-rs/tokio/issues/2879)