async-compression icon indicating copy to clipboard operation
async-compression copied to clipboard

proposal: Export codec and `Encode`/`Decode` trait

Open Xuanwo opened this issue 3 years ago • 8 comments

We are using our runtime in databend which will:

  • Running async task on async runtime (tokio here), for example, IO task.
  • Running sync task on sync runtime, for example, CPU bound task like decompress.

So we want to control the underlying behavior of async-compression:

  • Make IO happen on async runtime: poll the futures
  • Make decode happen on sync runtime

Thus, we have to directly access the Encode/Decode trait in codec and build our own XzDecoder.

Does this idea make sense to you? I'm willing to start a PR for it.


Also address: https://github.com/Nemo157/async-compression/issues/141

Xuanwo avatar May 20 '22 05:05 Xuanwo

Maybe expose decode in XzDecoder?

impl<R: AsyncBufRead, D: Decode> Decoder<R, D> {
    pub fn decode(
        self: Pin<&mut Self>,
        input: &mut PartialBuffer<impl AsRef<[u8]>>,
        output: &mut PartialBuffer<impl AsRef<[u8]> + AsMut<[u8]>>
    ) -> Result<bool> {
        self.decoder.decode(&mut input, output)
    }
}

Or, provide into_decoder():

pub fn into_decoder(self) -> D {
    self.decoder
}

Xuanwo avatar May 20 '22 09:05 Xuanwo

I am (slowly) working on a big refactor that will expose a lot of the internal building blocks, including the codec stuff. I'm hoping to have enough time to actually get something public in the next few weeks.

Nemo157 avatar May 20 '22 10:05 Nemo157

That's cool! If there is anything I can help with, just ping me back.

Xuanwo avatar May 20 '22 11:05 Xuanwo

@Xuanwo I was thinking about your usecase a bit more, and it seems like you would need the codec traits to be async; so you can spawn the synchronous work into your sync-runtime and return a handle to get the result?

Nemo157 avatar May 23 '22 11:05 Nemo157

so you can spawn the synchronous work into your sync-runtime and return a handle to get the result?

Yep, similar. In databend, we have our own processors like the following:

https://github.com/datafuselabs/databend/blob/2f9e425b55e2e28761c5a9b1e3fb11131fd7fa8e/query/src/pipelines/new/processors/processor.rs#L25-L49

pub enum Event {
    NeedData,
    NeedConsume,
    Sync,
    Async,
    Finished,
}


// The design is inspired by ClickHouse processors
#[async_trait::async_trait]
pub trait Processor: Send {
    fn name(&self) -> &'static str;


    fn event(&mut self) -> Result<Event>;


    // Synchronous work.
    fn process(&mut self) -> Result<()> {
        Err(ErrorCode::UnImplement("Unimplemented process."))
    }


    // Asynchronous work.
    async fn async_process(&mut self) -> Result<()> {
        Err(ErrorCode::UnImplement("Unimplemented async_process."))
    }
}

Every task will emit a new event, and the processor will decide whether to execute on blocking way or async way.

In our case (decompress), we have an async IO task and a sync decompress task. In my current plan, I will:

  • Implement a decoder wrapper that has an AsyncRead and a async-compression::codec::XzDecoder
  • Emit an async IO task to read a buffer from the reader
  • Emit a decompress task to make XzDecoder decode this buffer (via PartialBuffer)
  • Then we will parse the decompressed data into csv/parquet/json...

In conclusion, what we need is (based on current codebase, better design or ideas always welcome):

  • codec::Decoder trait (no changes need so far)
  • codec::XxxDecoder structs (we will call decode directly)

Xuanwo avatar May 23 '22 11:05 Xuanwo

Hi, @Nemo157, I got a demo here.

Code
#[derive(Debug)]
enum State {
    Reading,
    Decoding,
    Finishing,
    Done,
}

#[derive(Debug)]
struct Reader<R: AsyncBufRead + Unpin, D: Decode> {
    reader: R,
    decoder: D,
    multiple_members: bool,
}

impl<R: AsyncBufRead + Unpin, D: Decode> Reader<R, D> {
    pub fn new(reader: R, decoder: D) -> Self {
        Self {
            reader,
            decoder,
            multiple_members: false,
        }
    }

    pub async fn fill_buf(&mut self) -> Result<&[u8]> {
        self.reader.fill_buf().await
    }

    pub fn decode(&mut self, input: &[u8], output: &mut [u8]) -> Result<(State, usize)> {
        // If input is empty, inner reader must reach EOF, return directly.
        if input.is_empty() {
            debug!("input is empty, return directly");
            // Avoid attempting to reinitialise the decoder if the reader
            // has returned EOF.
            self.multiple_members = false;
            return Ok((State::Finishing, 0));
        }

        let mut input = PartialBuffer::new(input);
        let mut output = PartialBuffer::new(output);
        let done = self.decoder.decode(&mut input, &mut output)?;
        let len = input.written().len();
        debug!("advance reader with amt {}", len);
        Pin::new(&mut self.reader).consume(len);

        if done {
            Ok((State::Finishing, output.written().len()))
        } else {
            Ok((State::Reading, output.written().len()))
        }
    }

    pub fn finish(&mut self, output: &mut [u8]) -> Result<(State, usize)> {
        let mut output = PartialBuffer::new(output);
        let done = self.decoder.finish(&mut output)?;
        if done {
            if self.multiple_members {
                self.decoder.reinit()?;
                Ok((State::Reading, output.written().len()))
            } else {
                Ok((State::Done, output.written().len()))
            }
        } else {
            Ok((State::Finishing, output.written().len()))
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use async_compression::codec::{Decode, ZlibDecoder as ZlibCodec};
    use bytes::BufMut;
    use flate2::write::ZlibEncoder;
    use flate2::Compression;
    use futures::io::Cursor;
    use log::debug;
    use rand::prelude::*;
    use std::io;
    use std::io::Result;
    use std::io::Write;

    #[tokio::test]
    async fn decode_gzip() -> Result<()> {
        env_logger::init();

        let mut rng = ThreadRng::default();
        let mut content = vec![0; 16 * 1024 * 1024];
        rng.fill_bytes(&mut content);
        debug!("raw_content size: {}", content.len());

        let mut e = ZlibEncoder::new(Vec::new(), Compression::default());
        e.write_all(&content)?;
        let compressed_content = e.finish()?;
        debug!("compressed_content size: {}", compressed_content.len());

        let br = BufReader::with_capacity(64 * 1024, Cursor::new(compressed_content));
        let mut cr = Reader::new(br, ZlibCodec::new());

        let mut result = vec![0; 16 * 1024 * 1024];
        let mut cnt = 0;
        let mut state = State::Reading;
        let mut buf = Vec::new();
        loop {
            let (_, output) = result.split_at_mut(cnt);

            match state {
                State::Reading => {
                    debug!("start reading");
                    buf = cr.fill_buf().await?.to_vec();
                    debug!("read data: {}", buf.len());
                    state = State::Decoding
                }
                State::Decoding => unsafe {
                    debug!("start decoding from buf {} to output {}", buf.len(), cnt);
                    let (decode_state, written) = cr.decode(&buf, output)?;
                    debug!("decoded from buf {} as output {}", buf.len(), written);
                    state = decode_state;
                    cnt += written;
                },
                State::Finishing => {
                    debug!("start finishing to output {}", cnt);
                    let (finish_state, written) = cr.finish(output)?;
                    debug!("finished from buf {} as output {}", buf.len(), written);
                    state = finish_state;
                    cnt += written;
                }
                State::Done => {
                    debug!("done");
                    break;
                }
            }
        }

        assert_eq!(result, content);

        Ok(())
    }
}

The lesson learnt from this demo: based on the current design, we will need the following things to export: (only for the decoder)

  • Decode trait
  • every algo under codec
  • PartialBuffer (I don't know if we can replace it by tokio::io::ReadBuf)

The real changes: split IO from decode and let user to do it.

Xuanwo avatar May 26 '22 08:05 Xuanwo

PartialBuffer (I don't know if we can replace it by tokio::io::ReadBuf)

Yeah, I was thinking ReadBuf would hopefully be a replacement for it, at least for the output side; I have a very old test branch using it. I'm just waiting on it stabilizing in std before doing anything more with it.

Nemo157 avatar May 26 '22 10:05 Nemo157

Here is my progress, please take a look:

async_compress side: https://github.com/Xuanwo/async-compression/tree/public_api my use-case: https://github.com/datafuselabs/opendal/pull/289

Xuanwo avatar May 26 '22 14:05 Xuanwo