rust-csv icon indicating copy to clipboard operation
rust-csv copied to clipboard

Patterns for incrementally parsing chunked data

Open golddranks opened this issue 5 years ago • 8 comments

What version of the csv crate are you using?

name = "csv" version = "1.1.1"

Briefly describe the question, bug or feature request.

Hi, this is not a bug request, but just a question about the usage patterns. I'm dealing with streaming IO, and I've got my CSV parsing logic in a function that is repeatedly called with new chunks of data. I've encountered the following problems:

  1. Repeatedly constructing new Readers from the input slices "resets" the parsing: all info about headers etc. is lost.
  2. On exhausting the end of the input slice, I'd rather revert parsing to last complete record and return the unparsed input to be chained as a reader with the next chunk instead of trying to parse the final record and possibly missing the end of it.

So my question is: are the current API's intended to accomodate for this "chunked, incremental" use case, and I'm just failing to find the correct knobs and patterns or I'm I trying to use the csv crate for something it wasn't originally meant for? Would it be possible to add APIs or documentation that made this easier?

Include a complete program demonstrating a problem.

https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=0b76f5fd956649247bdc99d471ba8b1b

What is the observed behavior of the code above?

Parse chunk: Chain { t: [], u: [99, 111, 108, 95, 97, 44, 99, 111, 108, 95, 98, 44, 99, 111, 108, 95, 99, 10, 48, 97, 97, 97, 97, 44, 48, 98, 98, 98, 98, 44, 48, 99, 99, 99, 99, 10, 49, 97, 97, 97, 97, 44, 49, 98, 98, 98, 98, 44, 49, 99, 99] }
Got record: ByteRecord(["0aaaa", "0bbbb", "0cccc"])
Got record: ByteRecord(["1aaaa", "1bbbb", "1cc"])
Done. Unparsed: []
Parse chunk: Chain { t: [], u: [99, 99, 10] }
Done. Unparsed: []
Parse chunk: Chain { t: [], u: [50, 97, 97, 97, 97, 44, 50, 98, 98, 98, 98] }
Done. Unparsed: []
Parse chunk: Chain { t: [], u: [44, 50, 99, 99, 99, 99, 10] }
Done. Unparsed: []
Parse chunk: Chain { t: [], u: [51, 97, 97, 97, 97, 44, 51, 98, 98, 98, 98, 44, 51, 99, 99, 99, 99, 10, 52, 97, 97, 97, 97, 44, 52, 98, 98, 98, 98, 44, 52, 99, 99, 99, 99, 10, 53, 97, 97, 97, 97, 44, 53, 98, 98] }
Got record: ByteRecord(["4aaaa", "4bbbb", "4cccc"])
thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: Error(UnequalLengths { pos: Some(Position { byte: 36, line: 3, record: 2 }), expected_len: 3, len: 2 })', src/libcore/result.rs:1084:5
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace.

What is the expected or desired behavior of the code above?

Parse chunk: Chain { t: [], u: [99, 111, 108, 95, 97, 44, 99, 111, 108, 95, 98, 44, 99, 111, 108, 95, 99, 10, 48, 97, 97, 97, 97, 44, 48, 98, 98, 98, 98, 44, 48, 99, 99, 99, 99, 10, 49, 97, 97, 97, 97, 44, 49, 98, 98, 98, 98, 44, 49, 99, 99] }
Got record: ByteRecord(["0aaaa", "0bbbb", "0cccc"])
Done. Unparsed: [49, 97, 97, 97, 97, 44, 49, 98, 98, 98, 98, 44, 49, 99, 99]
Parse chunk: Chain { t: [49, 97, 97, 97, 97, 44, 49, 98, 98, 98, 98, 44, 49, 99, 99], u: [99, 99, 10] }
Got record: ByteRecord(["1aaaa", "1bbbb", "1cccc"])
Done. Unparsed: []
Parse chunk: Chain { t: [], u: [50, 97, 97, 97, 97, 44, 50, 98, 98, 98, 98] }
Done. Unparsed: [50, 97, 97, 97, 97, 44, 50, 98, 98, 98, 98]
Parse chunk: Chain { t: [50, 97, 97, 97, 97, 44, 50, 98, 98, 98, 98], u: [44, 50, 99, 99, 99, 99, 10] }
Got record: ByteRecord(["2aaaa", "2bbbb", "2cccc"])
Done. Unparsed: []
Parse chunk: Chain { t: [], u: [51, 97, 97, 97, 97, 44, 51, 98, 98, 98, 98, 44, 51, 99, 99, 99, 99, 10, 52, 97, 97, 97, 97, 44, 52, 98, 98, 98, 98, 44, 52, 99, 99, 99, 99, 10, 53, 97, 97, 97, 97, 44, 53, 98, 98] }
Got record: ByteRecord(["3aaaa", "3bbbb", "3cccc"])
Got record: ByteRecord(["4aaaa", "4bbbb", "4cccc"])
Done. Unparsed: [53, 97, 97, 97, 97, 44, 53, 98, 98]
Parse chunk: Chain { t: [53, 97, 97, 97, 97, 44, 53, 98, 98], u: [98, 98, 44, 53, 99, 99, 99, 99, 10] }
Got record: ByteRecord(["5aaaa", "5bbbb", "5cccc"])
Done. Unparsed: []

golddranks avatar Oct 18 '19 06:10 golddranks

Note that if I were dealing with sync IO, most libraries would provide a Reader abstraction that doesn't have this problem. However, I'm developing in a space where you don't have that luxury at the moment, so manually juggling chunks is needed – but it would be nice if it were a bit more ergonomic and easier.

golddranks avatar Oct 18 '19 06:10 golddranks

Some ideas for helpful APIs for this use case:

A setting for ReaderBuilder that makes it only parse known to be complete records

Parses until the record terminator, and doesn't consider an EOF to be a terminator. Something like:

csv::ReaderBuilder::new().only_complete(true).from_reader(chunk);

This would prevent "teared" parses and allow determining the unparsed chunk using .position().

A method on Reader to get a mutable reference to the inner reader

This would allow the user to create a type that wraps the chunk and implements the Read trait and construct csv::Reader from that. Once the chunk is exhausted, the user could get a mutable reference to the inner reader to feed it more data. This way csv::Reader could be constructed only once, unlike in the example in the original post.

A method on Reader to get the unparsed bytes

In the case it becomes possible to restrict parsing to complete records, the user might want to store the unparsed bytes (like in the main example in the original post) to be parsed with the next chunk.

golddranks avatar Oct 18 '19 06:10 golddranks

I believe this can be solved by dropping a level lower and using csv_core::Reader.

A better asynchronous story for csv and IO streams would be great for the higher level csv crate; but I think your proposal is solved by using csv_core.

martingallagher avatar Oct 26 '19 09:10 martingallagher

Indeed, I came to that realisation myself too. However, csv provides some very nice conveniences around csv_core. I actually started implementing a similarly high-level library but intended for the streaming use case. It reuses some csv code so I have to think about licensing and attribution issues (and there might be some value in upstreaming the work if @BurntSushi deems it worth it, but I'll iterate on the code in a separate repo for the time being). I think I'll be able to publish that at some later point.

golddranks avatar Oct 28 '19 06:10 golddranks

Sorry for the late response. I was on hiatus and it will be a bit of time before I make the maintenance rounds on this crate.

So my question is: are the current API's intended to accomodate for this "chunked, incremental" use case, and I'm just failing to find the correct knobs and patterns or I'm I trying to use the csv crate for something it wasn't originally meant for?

I think @martingallagher has it right. This is what csv-core is for. The csv crate was definitely not designed for incremental parsing. I'm not sure that's a use case I want to support at this time. I could be swayed though.

BurntSushi avatar Feb 14 '20 18:02 BurntSushi

As a person with absolutely no honor or dignity, this is what I did:

use std::io::Read;
use tokio::{
    io::{AsyncRead, AsyncReadExt},
    runtime::Handle,
};
pub struct BastardRead<R> {
    pub inner: R,
    pub handle: Handle,
}

impl<R: AsyncRead + Unpin> Read for BastardRead<R> {
    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
        self.handle.block_on(self.inner.read(buf))
    }

    fn read_to_end(&mut self, buf: &mut Vec<u8>) -> std::io::Result<usize> {
        self.handle.block_on(self.inner.read_to_end(buf))
    }

    fn read_to_string(&mut self, buf: &mut String) -> std::io::Result<usize> {
        self.handle.block_on(self.inner.read_to_string(buf))
    }

    fn read_exact(&mut self, buf: &mut [u8]) -> std::io::Result<()> {
        self.handle.block_on(self.inner.read_exact(buf)).map(|_| ())
    }
}

and then proceeded to make use of it equally immorally:

let mut res = self
    .inner
    .post(url)
    .query(&[("org", self.org.as_ref())])
    .header(reqwest::header::CONTENT_TYPE, "application/vnd.flux")
    .header(reqwest::header::AUTHORIZATION, &self.token)
    .body(query)
    .send()
    .await?
    .error_for_status()?;
let (mut send_tx, send_rx) = mpsc::channel(1024);
let handle = tokio::runtime::Handle::current();
let csv_task = spawn_blocking(move || {
    let csv = csv::Reader::from_reader(BastardRead {
        inner: stream_reader(send_rx.map(Ok)),
        handle: handle.clone(),
    });
    for row in csv.into_deserialize() {
        handle.block_on(output.send(row?))?;
    }
    Result::<_, anyhow::Error>::Ok(())
});

while let Some(bytes) = res.chunk().await? {
    send_tx.send(bytes).await?;
}
drop(send_tx);
csv_task.await??;

Instant serde. Instant profit. Lifelong shame.

qm3ster avatar Dec 02 '20 12:12 qm3ster

As a person with absolutely no honor or dignity, this is what I did:

use std::io::Read;
use tokio::{
    io::{AsyncRead, AsyncReadExt},
    runtime::Handle,
};
pub struct BastardRead<R> {
    pub inner: R,
    pub handle: Handle,
}

impl<R: AsyncRead + Unpin> Read for BastardRead<R> {
    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
        self.handle.block_on(self.inner.read(buf))
    }

    fn read_to_end(&mut self, buf: &mut Vec<u8>) -> std::io::Result<usize> {
        self.handle.block_on(self.inner.read_to_end(buf))
    }

    fn read_to_string(&mut self, buf: &mut String) -> std::io::Result<usize> {
        self.handle.block_on(self.inner.read_to_string(buf))
    }

    fn read_exact(&mut self, buf: &mut [u8]) -> std::io::Result<()> {
        self.handle.block_on(self.inner.read_exact(buf)).map(|_| ())
    }
}

and then proceeded to make use of it equally immorally:

let mut res = self
    .inner
    .post(url)
    .query(&[("org", self.org.as_ref())])
    .header(reqwest::header::CONTENT_TYPE, "application/vnd.flux")
    .header(reqwest::header::AUTHORIZATION, &self.token)
    .body(query)
    .send()
    .await?
    .error_for_status()?;
let (mut send_tx, send_rx) = mpsc::channel(1024);
let handle = tokio::runtime::Handle::current();
let csv_task = spawn_blocking(move || {
    let csv = csv::Reader::from_reader(BastardRead {
        inner: stream_reader(send_rx.map(Ok)),
        handle: handle.clone(),
    });
    for row in csv.into_deserialize() {
        handle.block_on(output.send(row?))?;
    }
    Result::<_, anyhow::Error>::Ok(())
});

while let Some(bytes) = res.chunk().await? {
    send_tx.send(bytes).await?;
}
drop(send_tx);
csv_task.await??;

Instant serde. Instant profit. Lifelong shame.

Your comment is an instant classic. Thanks for the laughs.

petar-dambovaliev avatar Jan 29 '22 14:01 petar-dambovaliev

@petar-dambovaliev no, but, liek, think about it. Did I benchmark it? No. Why bound the send channel to 1024? Who knows. Why have one at all instead of using the stream feature of reqwest? Absolutely no reason! Why am I using into_deserialize even though I am not returning it? Also no reason. So it anyone is actually going to do this silly thing again, they should probably at least do:

let mut res = self
    .inner
    .post(url)
    .query(&[("org", self.org.as_ref())])
    .header(reqwest::header::CONTENT_TYPE, "application/vnd.flux")
    .header(reqwest::header::AUTHORIZATION, &self.token)
    .body(query)
    .send()
    .await?
    .error_for_status()?;
let handle = tokio::runtime::Handle::current();
spawn_blocking(move || {
    let csv = csv::Reader::from_reader(BastardRead {
        inner: stream_reader(
            res.bytes_stream()
                // might be able to `TryStream::map_err` directly 🤔
                .map(|r| r.map_err(|e| io::Error::new(ErrorKind::Other, e))),
        ),
        handle: handle.clone(),
    });
    for row in csv.into_deserialize() {
        handle.block_on(output.send(row?))?;
    }
    Result::<_, anyhow::Error>::Ok(())
}).await??;

One could then still downcast the csv::ErrorKind::Io(e) with io::ErrorKind::Other to reqwest::Error, but since we're just jumbling everything into anyhow, I didn't add that. Also worth investigating is whether block_in_place would be an appropriate alternative to the immediately-awaited spawn_blocking (now that we aren't shoveling Bytes by hand in the task).

qm3ster avatar Jan 29 '22 21:01 qm3ster