async-compression icon indicating copy to clipboard operation
async-compression copied to clipboard

`ZstdEncoder` should not panic if `tokio::io::AsyncWrite::flush` is called after `tokio::io::AsyncWrite::shutdown`

Open aatifsyed opened this issue 10 months ago • 0 comments

I'm hitting a Flush after shutdown panic, ultimately here: https://github.com/Nullus157/async-compression/blob/ece5584ce59a683c38e94f9957969e1bdb08b665/src/tokio/write/generic/encoder.rs#L97

Repro

Here's an minimal example, which reliably panics on my machine.

use async_compression::tokio::write::ZstdEncoder; // 0.4.3
use bytes::Bytes; // 1.5.0
use futures::{stream, StreamExt as _}; // 0.3.28
use std::io; // 1.72.0
use tokio::fs::File; // 1.32.0
use tokio_util::codec::{BytesCodec, FramedWrite}; // 0.7.8

async fn _main() -> io::Result<()> {
    let file = File::create("/dev/null").await?;
    let zstd_encoder = ZstdEncoder::new(file);
    let bytes_sink = FramedWrite::new(zstd_encoder, BytesCodec::new());
    stream::empty::<io::Result<Bytes>>()
        .forward(bytes_sink)
        .await
}

#[tokio::main]
async fn main() -> io::Result<()> {
    _main().await
}

Explanation

Here's the sequence of events:

  ┌───────────────────────────────────┐ ┌──────────────────────────────────┬────────────────────────────────┐ 
  │ file::poll_flush -> Ready(Ok(())) │ │ file::poll_write -> Ready(Ok(9)) │ file::poll_shutdown -> Pending │ 
 ┌┴───────────────────────────────────┴┬┴──────────────────────────────────┴────────────────────────────────┴┐ ┌─────────────BOOM
 │  zstd::poll_flush -> Ready(Ok(()))  │                    zstd::poll_shutdown -> Pending                   │ │ zstd::poll_flush
┌┴─────────────────────────────────────┴─────────────────────────────────────────────────────────────────────┴┬┴─────────────BOOM
│                                      sink::poll_close -> Pending                                            │  sink::poll_close
└─────────────────────────────────────────────────────────────────────────────────────────────────────────────┴──────────────BOOM
  • The sink needs to close
  • It flushes the encoder, which flushes the file (writing the zstd header to disk)
  • It shutdowns the encoder, which shutdowns the file, but the file returns Pending.
  • The sink tries to close again, when the file is ready.
  • It flushes the encoder again, which panics

Discussion

The bug is basically an interaction between the above code, and this code in tokio_util:

    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        ready!(self.as_mut().poll_flush(cx))?;
        ready!(self.project().inner.poll_shutdown(cx))?;

        Poll::Ready(Ok(()))
    }

on docs.rs / on github

A fix could be either:

  1. tokio_util::codec keeps track of whether it has flushed the inner reader in poll_close (not flushing it twice)
  2. Remove the panic in async_compression, or enhance its state machine to address the above.

I think the right fix is (2):

The documentation for AsyncWrite doesn't say that you're not allowed to flush after calling shutdown, in fact, I think implementors should be prepared to handle such a case, at least until shutdown returns Poll::Ready.

Take the following from the docs:

Invocation of a shutdown implies an invocation of flush. Once this method returns Ready it implies that a flush successfully happened before the shutdown happened. That is, callers don’t need to call flush before calling shutdown. They can rely that by calling shutdown any pending buffered data will be written out.

So following the API, I could write a simple Transparent<T: AsyncWrite> wrapper:

pin_project! {
struct Transparent<T> {
    #[pin]
    inner: T
}}

impl<T> AsyncWrite for Transparent<T>
where
    T: AsyncWrite,
{
    fn poll_write(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &[u8],
    ) -> Poll<io::Result<usize>> {
        self.project().inner.poll_write(cx, buf)
    }

    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
        self.project().inner.poll_flush(cx)
    }

    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
        let mut this = self.project();
        ready!(this.inner.as_mut().poll_flush(cx))?;
        this.inner.poll_shutdown(cx)
    }
}

impl<T> Transparent<T> {
    fn new(inner: T) -> Self {
        Self { inner }
    }
}

which, of course, panics if it contains a ZstdEncoder.

In fact, with the appropriate interleaving of Poll::Pending, ZstdEncoder::new(ZstdEncoder::new(...)) will panic.

I'm pretty sure this affects all tokio codecs in this crate.

aatifsyed avatar Sep 14 '23 22:09 aatifsyed