async-compression
async-compression copied to clipboard
`ZstdEncoder` should not panic if `tokio::io::AsyncWrite::flush` is called after `tokio::io::AsyncWrite::shutdown`
I'm hitting a Flush after shutdown
panic, ultimately here:
https://github.com/Nullus157/async-compression/blob/ece5584ce59a683c38e94f9957969e1bdb08b665/src/tokio/write/generic/encoder.rs#L97
Repro
Here's an minimal example, which reliably panics on my machine.
use async_compression::tokio::write::ZstdEncoder; // 0.4.3
use bytes::Bytes; // 1.5.0
use futures::{stream, StreamExt as _}; // 0.3.28
use std::io; // 1.72.0
use tokio::fs::File; // 1.32.0
use tokio_util::codec::{BytesCodec, FramedWrite}; // 0.7.8
async fn _main() -> io::Result<()> {
let file = File::create("/dev/null").await?;
let zstd_encoder = ZstdEncoder::new(file);
let bytes_sink = FramedWrite::new(zstd_encoder, BytesCodec::new());
stream::empty::<io::Result<Bytes>>()
.forward(bytes_sink)
.await
}
#[tokio::main]
async fn main() -> io::Result<()> {
_main().await
}
Explanation
Here's the sequence of events:
┌───────────────────────────────────┐ ┌──────────────────────────────────┬────────────────────────────────┐
│ file::poll_flush -> Ready(Ok(())) │ │ file::poll_write -> Ready(Ok(9)) │ file::poll_shutdown -> Pending │
┌┴───────────────────────────────────┴┬┴──────────────────────────────────┴────────────────────────────────┴┐ ┌─────────────BOOM
│ zstd::poll_flush -> Ready(Ok(())) │ zstd::poll_shutdown -> Pending │ │ zstd::poll_flush
┌┴─────────────────────────────────────┴─────────────────────────────────────────────────────────────────────┴┬┴─────────────BOOM
│ sink::poll_close -> Pending │ sink::poll_close
└─────────────────────────────────────────────────────────────────────────────────────────────────────────────┴──────────────BOOM
- The sink needs to
close
- It
flush
es the encoder, whichflush
es the file (writing the zstd header to disk) - It
shutdown
s the encoder, whichshutdown
s the file, but the file returnsPending
. - The sink tries to
close
again, when the file is ready. - It
flush
es the encoder again, which panics
Discussion
The bug is basically an interaction between the above code, and this code in tokio_util
:
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
ready!(self.as_mut().poll_flush(cx))?;
ready!(self.project().inner.poll_shutdown(cx))?;
Poll::Ready(Ok(()))
}
A fix could be either:
-
tokio_util::codec
keeps track of whether it has flushed the inner reader inpoll_close
(not flushing it twice) - Remove the panic in
async_compression
, or enhance its state machine to address the above.
I think the right fix is (2):
The documentation for AsyncWrite
doesn't say that you're not allowed to flush after calling shutdown, in fact, I think implementors should be prepared to handle such a case, at least until shutdown returns Poll::Ready
.
Take the following from the docs:
Invocation of a shutdown implies an invocation of flush. Once this method returns Ready it implies that a flush successfully happened before the shutdown happened. That is, callers don’t need to call flush before calling shutdown. They can rely that by calling shutdown any pending buffered data will be written out.
So following the API, I could write a simple Transparent<T: AsyncWrite>
wrapper:
pin_project! {
struct Transparent<T> {
#[pin]
inner: T
}}
impl<T> AsyncWrite for Transparent<T>
where
T: AsyncWrite,
{
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
self.project().inner.poll_write(cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.project().inner.poll_flush(cx)
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let mut this = self.project();
ready!(this.inner.as_mut().poll_flush(cx))?;
this.inner.poll_shutdown(cx)
}
}
impl<T> Transparent<T> {
fn new(inner: T) -> Self {
Self { inner }
}
}
which, of course, panics if it contains a ZstdEncoder
.
In fact, with the appropriate interleaving of Poll::Pending
, ZstdEncoder::new(ZstdEncoder::new(...))
will panic.
I'm pretty sure this affects all tokio codecs in this crate.