rust-csv
rust-csv copied to clipboard
Patterns for incrementally parsing chunked data
What version of the csv
crate are you using?
name = "csv" version = "1.1.1"
Briefly describe the question, bug or feature request.
Hi, this is not a bug request, but just a question about the usage patterns. I'm dealing with streaming IO, and I've got my CSV parsing logic in a function that is repeatedly called with new chunks of data. I've encountered the following problems:
- Repeatedly constructing new Readers from the input slices "resets" the parsing: all info about headers etc. is lost.
- On exhausting the end of the input slice, I'd rather revert parsing to last complete record and return the unparsed input to be
chained
as a reader with the next chunk instead of trying to parse the final record and possibly missing the end of it.
So my question is: are the current API's intended to accomodate for this "chunked, incremental" use case, and I'm just failing to find the correct knobs and patterns or I'm I trying to use the csv crate for something it wasn't originally meant for? Would it be possible to add APIs or documentation that made this easier?
Include a complete program demonstrating a problem.
https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=0b76f5fd956649247bdc99d471ba8b1b
What is the observed behavior of the code above?
Parse chunk: Chain { t: [], u: [99, 111, 108, 95, 97, 44, 99, 111, 108, 95, 98, 44, 99, 111, 108, 95, 99, 10, 48, 97, 97, 97, 97, 44, 48, 98, 98, 98, 98, 44, 48, 99, 99, 99, 99, 10, 49, 97, 97, 97, 97, 44, 49, 98, 98, 98, 98, 44, 49, 99, 99] }
Got record: ByteRecord(["0aaaa", "0bbbb", "0cccc"])
Got record: ByteRecord(["1aaaa", "1bbbb", "1cc"])
Done. Unparsed: []
Parse chunk: Chain { t: [], u: [99, 99, 10] }
Done. Unparsed: []
Parse chunk: Chain { t: [], u: [50, 97, 97, 97, 97, 44, 50, 98, 98, 98, 98] }
Done. Unparsed: []
Parse chunk: Chain { t: [], u: [44, 50, 99, 99, 99, 99, 10] }
Done. Unparsed: []
Parse chunk: Chain { t: [], u: [51, 97, 97, 97, 97, 44, 51, 98, 98, 98, 98, 44, 51, 99, 99, 99, 99, 10, 52, 97, 97, 97, 97, 44, 52, 98, 98, 98, 98, 44, 52, 99, 99, 99, 99, 10, 53, 97, 97, 97, 97, 44, 53, 98, 98] }
Got record: ByteRecord(["4aaaa", "4bbbb", "4cccc"])
thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: Error(UnequalLengths { pos: Some(Position { byte: 36, line: 3, record: 2 }), expected_len: 3, len: 2 })', src/libcore/result.rs:1084:5
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace.
What is the expected or desired behavior of the code above?
Parse chunk: Chain { t: [], u: [99, 111, 108, 95, 97, 44, 99, 111, 108, 95, 98, 44, 99, 111, 108, 95, 99, 10, 48, 97, 97, 97, 97, 44, 48, 98, 98, 98, 98, 44, 48, 99, 99, 99, 99, 10, 49, 97, 97, 97, 97, 44, 49, 98, 98, 98, 98, 44, 49, 99, 99] }
Got record: ByteRecord(["0aaaa", "0bbbb", "0cccc"])
Done. Unparsed: [49, 97, 97, 97, 97, 44, 49, 98, 98, 98, 98, 44, 49, 99, 99]
Parse chunk: Chain { t: [49, 97, 97, 97, 97, 44, 49, 98, 98, 98, 98, 44, 49, 99, 99], u: [99, 99, 10] }
Got record: ByteRecord(["1aaaa", "1bbbb", "1cccc"])
Done. Unparsed: []
Parse chunk: Chain { t: [], u: [50, 97, 97, 97, 97, 44, 50, 98, 98, 98, 98] }
Done. Unparsed: [50, 97, 97, 97, 97, 44, 50, 98, 98, 98, 98]
Parse chunk: Chain { t: [50, 97, 97, 97, 97, 44, 50, 98, 98, 98, 98], u: [44, 50, 99, 99, 99, 99, 10] }
Got record: ByteRecord(["2aaaa", "2bbbb", "2cccc"])
Done. Unparsed: []
Parse chunk: Chain { t: [], u: [51, 97, 97, 97, 97, 44, 51, 98, 98, 98, 98, 44, 51, 99, 99, 99, 99, 10, 52, 97, 97, 97, 97, 44, 52, 98, 98, 98, 98, 44, 52, 99, 99, 99, 99, 10, 53, 97, 97, 97, 97, 44, 53, 98, 98] }
Got record: ByteRecord(["3aaaa", "3bbbb", "3cccc"])
Got record: ByteRecord(["4aaaa", "4bbbb", "4cccc"])
Done. Unparsed: [53, 97, 97, 97, 97, 44, 53, 98, 98]
Parse chunk: Chain { t: [53, 97, 97, 97, 97, 44, 53, 98, 98], u: [98, 98, 44, 53, 99, 99, 99, 99, 10] }
Got record: ByteRecord(["5aaaa", "5bbbb", "5cccc"])
Done. Unparsed: []
Note that if I were dealing with sync IO, most libraries would provide a Reader abstraction that doesn't have this problem. However, I'm developing in a space where you don't have that luxury at the moment, so manually juggling chunks is needed – but it would be nice if it were a bit more ergonomic and easier.
Some ideas for helpful APIs for this use case:
A setting for ReaderBuilder
that makes it only parse known to be complete records
Parses until the record terminator, and doesn't consider an EOF to be a terminator. Something like:
csv::ReaderBuilder::new().only_complete(true).from_reader(chunk);
This would prevent "teared" parses and allow determining the unparsed chunk using .position()
.
A method on Reader
to get a mutable reference to the inner reader
This would allow the user to create a type that wraps the chunk and implements the Read
trait and construct csv::Reader
from that. Once the chunk is exhausted, the user could get a mutable reference to the inner reader to feed it more data. This way csv::Reader
could be constructed only once, unlike in the example in the original post.
A method on Reader
to get the unparsed bytes
In the case it becomes possible to restrict parsing to complete records, the user might want to store the unparsed bytes (like in the main example in the original post) to be parsed with the next chunk.
I believe this can be solved by dropping a level lower and using csv_core::Reader
.
A better asynchronous story for csv
and IO streams would be great for the higher level csv
crate; but I think your proposal is solved by using csv_core.
Indeed, I came to that realisation myself too. However, csv
provides some very nice conveniences around csv_core
. I actually started implementing a similarly high-level library but intended for the streaming use case. It reuses some csv
code so I have to think about licensing and attribution issues (and there might be some value in upstreaming the work if @BurntSushi deems it worth it, but I'll iterate on the code in a separate repo for the time being). I think I'll be able to publish that at some later point.
Sorry for the late response. I was on hiatus and it will be a bit of time before I make the maintenance rounds on this crate.
So my question is: are the current API's intended to accomodate for this "chunked, incremental" use case, and I'm just failing to find the correct knobs and patterns or I'm I trying to use the csv crate for something it wasn't originally meant for?
I think @martingallagher has it right. This is what csv-core
is for. The csv
crate was definitely not designed for incremental parsing. I'm not sure that's a use case I want to support at this time. I could be swayed though.
As a person with absolutely no honor or dignity, this is what I did:
use std::io::Read;
use tokio::{
io::{AsyncRead, AsyncReadExt},
runtime::Handle,
};
pub struct BastardRead<R> {
pub inner: R,
pub handle: Handle,
}
impl<R: AsyncRead + Unpin> Read for BastardRead<R> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
self.handle.block_on(self.inner.read(buf))
}
fn read_to_end(&mut self, buf: &mut Vec<u8>) -> std::io::Result<usize> {
self.handle.block_on(self.inner.read_to_end(buf))
}
fn read_to_string(&mut self, buf: &mut String) -> std::io::Result<usize> {
self.handle.block_on(self.inner.read_to_string(buf))
}
fn read_exact(&mut self, buf: &mut [u8]) -> std::io::Result<()> {
self.handle.block_on(self.inner.read_exact(buf)).map(|_| ())
}
}
and then proceeded to make use of it equally immorally:
let mut res = self
.inner
.post(url)
.query(&[("org", self.org.as_ref())])
.header(reqwest::header::CONTENT_TYPE, "application/vnd.flux")
.header(reqwest::header::AUTHORIZATION, &self.token)
.body(query)
.send()
.await?
.error_for_status()?;
let (mut send_tx, send_rx) = mpsc::channel(1024);
let handle = tokio::runtime::Handle::current();
let csv_task = spawn_blocking(move || {
let csv = csv::Reader::from_reader(BastardRead {
inner: stream_reader(send_rx.map(Ok)),
handle: handle.clone(),
});
for row in csv.into_deserialize() {
handle.block_on(output.send(row?))?;
}
Result::<_, anyhow::Error>::Ok(())
});
while let Some(bytes) = res.chunk().await? {
send_tx.send(bytes).await?;
}
drop(send_tx);
csv_task.await??;
Instant serde
. Instant profit. Lifelong shame.
As a person with absolutely no honor or dignity, this is what I did:
use std::io::Read; use tokio::{ io::{AsyncRead, AsyncReadExt}, runtime::Handle, }; pub struct BastardRead<R> { pub inner: R, pub handle: Handle, } impl<R: AsyncRead + Unpin> Read for BastardRead<R> { fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> { self.handle.block_on(self.inner.read(buf)) } fn read_to_end(&mut self, buf: &mut Vec<u8>) -> std::io::Result<usize> { self.handle.block_on(self.inner.read_to_end(buf)) } fn read_to_string(&mut self, buf: &mut String) -> std::io::Result<usize> { self.handle.block_on(self.inner.read_to_string(buf)) } fn read_exact(&mut self, buf: &mut [u8]) -> std::io::Result<()> { self.handle.block_on(self.inner.read_exact(buf)).map(|_| ()) } }
and then proceeded to make use of it equally immorally:
let mut res = self .inner .post(url) .query(&[("org", self.org.as_ref())]) .header(reqwest::header::CONTENT_TYPE, "application/vnd.flux") .header(reqwest::header::AUTHORIZATION, &self.token) .body(query) .send() .await? .error_for_status()?; let (mut send_tx, send_rx) = mpsc::channel(1024); let handle = tokio::runtime::Handle::current(); let csv_task = spawn_blocking(move || { let csv = csv::Reader::from_reader(BastardRead { inner: stream_reader(send_rx.map(Ok)), handle: handle.clone(), }); for row in csv.into_deserialize() { handle.block_on(output.send(row?))?; } Result::<_, anyhow::Error>::Ok(()) }); while let Some(bytes) = res.chunk().await? { send_tx.send(bytes).await?; } drop(send_tx); csv_task.await??;
Instant
serde
. Instant profit. Lifelong shame.
Your comment is an instant classic. Thanks for the laughs.
@petar-dambovaliev no, but, liek, think about it.
Did I benchmark it? No.
Why bound the send channel to 1024? Who knows.
Why have one at all instead of using the stream
feature of reqwest
?
Absolutely no reason!
Why am I using into_deserialize
even though I am not returning it?
Also no reason.
So it anyone is actually going to do this silly thing again, they should probably at least do:
let mut res = self
.inner
.post(url)
.query(&[("org", self.org.as_ref())])
.header(reqwest::header::CONTENT_TYPE, "application/vnd.flux")
.header(reqwest::header::AUTHORIZATION, &self.token)
.body(query)
.send()
.await?
.error_for_status()?;
let handle = tokio::runtime::Handle::current();
spawn_blocking(move || {
let csv = csv::Reader::from_reader(BastardRead {
inner: stream_reader(
res.bytes_stream()
// might be able to `TryStream::map_err` directly 🤔
.map(|r| r.map_err(|e| io::Error::new(ErrorKind::Other, e))),
),
handle: handle.clone(),
});
for row in csv.into_deserialize() {
handle.block_on(output.send(row?))?;
}
Result::<_, anyhow::Error>::Ok(())
}).await??;
One could then still downcast the csv::ErrorKind::Io(e)
with io::ErrorKind::Other
to reqwest::Error
, but since we're just jumbling everything into anyhow
, I didn't add that.
Also worth investigating is whether block_in_place
would be an appropriate alternative to the immediately-await
ed spawn_blocking
(now that we aren't shoveling Bytes
by hand in the task).