async-compression
async-compression copied to clipboard
proposal: Export codec and `Encode`/`Decode` trait
We are using our runtime in databend which will:
- Running async task on async runtime (tokio here), for example, IO task.
- Running sync task on sync runtime, for example, CPU bound task like decompress.
So we want to control the underlying behavior of async-compression:
- Make IO happen on async runtime: poll the futures
- Make
decodehappen on sync runtime
Thus, we have to directly access the Encode/Decode trait in codec and build our own XzDecoder.
Does this idea make sense to you? I'm willing to start a PR for it.
Also address: https://github.com/Nemo157/async-compression/issues/141
Maybe expose decode in XzDecoder?
impl<R: AsyncBufRead, D: Decode> Decoder<R, D> {
pub fn decode(
self: Pin<&mut Self>,
input: &mut PartialBuffer<impl AsRef<[u8]>>,
output: &mut PartialBuffer<impl AsRef<[u8]> + AsMut<[u8]>>
) -> Result<bool> {
self.decoder.decode(&mut input, output)
}
}
Or, provide into_decoder():
pub fn into_decoder(self) -> D {
self.decoder
}
I am (slowly) working on a big refactor that will expose a lot of the internal building blocks, including the codec stuff. I'm hoping to have enough time to actually get something public in the next few weeks.
That's cool! If there is anything I can help with, just ping me back.
@Xuanwo I was thinking about your usecase a bit more, and it seems like you would need the codec traits to be async; so you can spawn the synchronous work into your sync-runtime and return a handle to get the result?
so you can spawn the synchronous work into your sync-runtime and return a handle to get the result?
Yep, similar. In databend, we have our own processors like the following:
https://github.com/datafuselabs/databend/blob/2f9e425b55e2e28761c5a9b1e3fb11131fd7fa8e/query/src/pipelines/new/processors/processor.rs#L25-L49
pub enum Event {
NeedData,
NeedConsume,
Sync,
Async,
Finished,
}
// The design is inspired by ClickHouse processors
#[async_trait::async_trait]
pub trait Processor: Send {
fn name(&self) -> &'static str;
fn event(&mut self) -> Result<Event>;
// Synchronous work.
fn process(&mut self) -> Result<()> {
Err(ErrorCode::UnImplement("Unimplemented process."))
}
// Asynchronous work.
async fn async_process(&mut self) -> Result<()> {
Err(ErrorCode::UnImplement("Unimplemented async_process."))
}
}
Every task will emit a new event, and the processor will decide whether to execute on blocking way or async way.
In our case (decompress), we have an async IO task and a sync decompress task. In my current plan, I will:
- Implement a decoder wrapper that has an AsyncRead and a
async-compression::codec::XzDecoder - Emit an async IO task to read a buffer from the reader
- Emit a decompress task to make
XzDecoderdecode this buffer (via PartialBuffer) - Then we will parse the decompressed data into csv/parquet/json...
In conclusion, what we need is (based on current codebase, better design or ideas always welcome):
- codec::Decoder trait (no changes need so far)
- codec::XxxDecoder structs (we will call
decodedirectly)
Hi, @Nemo157, I got a demo here.
Code
#[derive(Debug)]
enum State {
Reading,
Decoding,
Finishing,
Done,
}
#[derive(Debug)]
struct Reader<R: AsyncBufRead + Unpin, D: Decode> {
reader: R,
decoder: D,
multiple_members: bool,
}
impl<R: AsyncBufRead + Unpin, D: Decode> Reader<R, D> {
pub fn new(reader: R, decoder: D) -> Self {
Self {
reader,
decoder,
multiple_members: false,
}
}
pub async fn fill_buf(&mut self) -> Result<&[u8]> {
self.reader.fill_buf().await
}
pub fn decode(&mut self, input: &[u8], output: &mut [u8]) -> Result<(State, usize)> {
// If input is empty, inner reader must reach EOF, return directly.
if input.is_empty() {
debug!("input is empty, return directly");
// Avoid attempting to reinitialise the decoder if the reader
// has returned EOF.
self.multiple_members = false;
return Ok((State::Finishing, 0));
}
let mut input = PartialBuffer::new(input);
let mut output = PartialBuffer::new(output);
let done = self.decoder.decode(&mut input, &mut output)?;
let len = input.written().len();
debug!("advance reader with amt {}", len);
Pin::new(&mut self.reader).consume(len);
if done {
Ok((State::Finishing, output.written().len()))
} else {
Ok((State::Reading, output.written().len()))
}
}
pub fn finish(&mut self, output: &mut [u8]) -> Result<(State, usize)> {
let mut output = PartialBuffer::new(output);
let done = self.decoder.finish(&mut output)?;
if done {
if self.multiple_members {
self.decoder.reinit()?;
Ok((State::Reading, output.written().len()))
} else {
Ok((State::Done, output.written().len()))
}
} else {
Ok((State::Finishing, output.written().len()))
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use async_compression::codec::{Decode, ZlibDecoder as ZlibCodec};
use bytes::BufMut;
use flate2::write::ZlibEncoder;
use flate2::Compression;
use futures::io::Cursor;
use log::debug;
use rand::prelude::*;
use std::io;
use std::io::Result;
use std::io::Write;
#[tokio::test]
async fn decode_gzip() -> Result<()> {
env_logger::init();
let mut rng = ThreadRng::default();
let mut content = vec![0; 16 * 1024 * 1024];
rng.fill_bytes(&mut content);
debug!("raw_content size: {}", content.len());
let mut e = ZlibEncoder::new(Vec::new(), Compression::default());
e.write_all(&content)?;
let compressed_content = e.finish()?;
debug!("compressed_content size: {}", compressed_content.len());
let br = BufReader::with_capacity(64 * 1024, Cursor::new(compressed_content));
let mut cr = Reader::new(br, ZlibCodec::new());
let mut result = vec![0; 16 * 1024 * 1024];
let mut cnt = 0;
let mut state = State::Reading;
let mut buf = Vec::new();
loop {
let (_, output) = result.split_at_mut(cnt);
match state {
State::Reading => {
debug!("start reading");
buf = cr.fill_buf().await?.to_vec();
debug!("read data: {}", buf.len());
state = State::Decoding
}
State::Decoding => unsafe {
debug!("start decoding from buf {} to output {}", buf.len(), cnt);
let (decode_state, written) = cr.decode(&buf, output)?;
debug!("decoded from buf {} as output {}", buf.len(), written);
state = decode_state;
cnt += written;
},
State::Finishing => {
debug!("start finishing to output {}", cnt);
let (finish_state, written) = cr.finish(output)?;
debug!("finished from buf {} as output {}", buf.len(), written);
state = finish_state;
cnt += written;
}
State::Done => {
debug!("done");
break;
}
}
}
assert_eq!(result, content);
Ok(())
}
}
The lesson learnt from this demo: based on the current design, we will need the following things to export: (only for the decoder)
Decodetrait- every algo under
codec PartialBuffer(I don't know if we can replace it bytokio::io::ReadBuf)
The real changes: split IO from decode and let user to do it.
PartialBuffer(I don't know if we can replace it bytokio::io::ReadBuf)
Yeah, I was thinking ReadBuf would hopefully be a replacement for it, at least for the output side; I have a very old test branch using it. I'm just waiting on it stabilizing in std before doing anything more with it.
Here is my progress, please take a look:
async_compress side: https://github.com/Xuanwo/async-compression/tree/public_api my use-case: https://github.com/datafuselabs/opendal/pull/289