async-compression
async-compression copied to clipboard
`ZstdEncoder` should not panic if `tokio::io::AsyncWrite::flush` is called after `tokio::io::AsyncWrite::shutdown`
I'm hitting a Flush after shutdown panic, ultimately here:
https://github.com/Nullus157/async-compression/blob/ece5584ce59a683c38e94f9957969e1bdb08b665/src/tokio/write/generic/encoder.rs#L97
Repro
Here's an minimal example, which reliably panics on my machine.
use async_compression::tokio::write::ZstdEncoder; // 0.4.3
use bytes::Bytes; // 1.5.0
use futures::{stream, StreamExt as _}; // 0.3.28
use std::io; // 1.72.0
use tokio::fs::File; // 1.32.0
use tokio_util::codec::{BytesCodec, FramedWrite}; // 0.7.8
async fn _main() -> io::Result<()> {
let file = File::create("/dev/null").await?;
let zstd_encoder = ZstdEncoder::new(file);
let bytes_sink = FramedWrite::new(zstd_encoder, BytesCodec::new());
stream::empty::<io::Result<Bytes>>()
.forward(bytes_sink)
.await
}
#[tokio::main]
async fn main() -> io::Result<()> {
_main().await
}
Explanation
Here's the sequence of events:
┌───────────────────────────────────┐ ┌──────────────────────────────────┬────────────────────────────────┐
│ file::poll_flush -> Ready(Ok(())) │ │ file::poll_write -> Ready(Ok(9)) │ file::poll_shutdown -> Pending │
┌┴───────────────────────────────────┴┬┴──────────────────────────────────┴────────────────────────────────┴┐ ┌─────────────BOOM
│ zstd::poll_flush -> Ready(Ok(())) │ zstd::poll_shutdown -> Pending │ │ zstd::poll_flush
┌┴─────────────────────────────────────┴─────────────────────────────────────────────────────────────────────┴┬┴─────────────BOOM
│ sink::poll_close -> Pending │ sink::poll_close
└─────────────────────────────────────────────────────────────────────────────────────────────────────────────┴──────────────BOOM
- The sink needs to
close - It
flushes the encoder, whichflushes the file (writing the zstd header to disk) - It
shutdowns the encoder, whichshutdowns the file, but the file returnsPending. - The sink tries to
closeagain, when the file is ready. - It
flushes the encoder again, which panics
Discussion
The bug is basically an interaction between the above code, and this code in tokio_util:
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
ready!(self.as_mut().poll_flush(cx))?;
ready!(self.project().inner.poll_shutdown(cx))?;
Poll::Ready(Ok(()))
}
A fix could be either:
tokio_util::codeckeeps track of whether it has flushed the inner reader inpoll_close(not flushing it twice)- Remove the panic in
async_compression, or enhance its state machine to address the above.
I think the right fix is (2):
The documentation for AsyncWrite doesn't say that you're not allowed to flush after calling shutdown, in fact, I think implementors should be prepared to handle such a case, at least until shutdown returns Poll::Ready.
Take the following from the docs:
Invocation of a shutdown implies an invocation of flush. Once this method returns Ready it implies that a flush successfully happened before the shutdown happened. That is, callers don’t need to call flush before calling shutdown. They can rely that by calling shutdown any pending buffered data will be written out.
So following the API, I could write a simple Transparent<T: AsyncWrite> wrapper:
pin_project! {
struct Transparent<T> {
#[pin]
inner: T
}}
impl<T> AsyncWrite for Transparent<T>
where
T: AsyncWrite,
{
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
self.project().inner.poll_write(cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.project().inner.poll_flush(cx)
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let mut this = self.project();
ready!(this.inner.as_mut().poll_flush(cx))?;
this.inner.poll_shutdown(cx)
}
}
impl<T> Transparent<T> {
fn new(inner: T) -> Self {
Self { inner }
}
}
which, of course, panics if it contains a ZstdEncoder.
In fact, with the appropriate interleaving of Poll::Pending, ZstdEncoder::new(ZstdEncoder::new(...)) will panic.
I'm pretty sure this affects all tokio codecs in this crate.