polars
polars copied to clipboard
Add BytesIO support to `scan_csv`
Problem Description
Firstly, thank you for making this fantastic library 😀
I have the following use case:
import zstandard
import polars as pl
with open(path, 'rb') as f:
dctx = zstandard.ZstdDecompressor()
with dctx.stream_reader(f) as reader:
df = pl.read_csv(reader, sep='\t')
Where path is the path of a Zstandard compressed TSV file. I'm working with bioinformatics data, and bioinformaticians love to generate massive CSVs/TSVs and then compress them.
I would like to use scan_csv to read the decompressed BytesIO stream instead and take advantage of all the cool lazy evaluation features to reduce memory usage. Alternatively, it would be great if scan_csv supported Zstandard compressed file paths directly.
Thanks for your time!
Out of curiosity, is this Zstandard compressed TSV file public data or is it from an internal pipeline? Never saw any Zstandard compressed TSV files in the wild myself unfortunately.
The file I'm trying to work with is a variant information file from plink2 (.pvar.zst or .bim.zst). plink is a popular toolkit used in genome wide association studies and related areas.
Some public data are available here.
+1 to this feature request; in the meantime, you can use pandas.read_csv or pyarrow.csv.open_csv to get some of this behavior.
Why go via pandas? pl.read_csv accepts a BytesIO?
This would also help with https://github.com/pola-rs/polars/issues/7514
I managed to get read_csv to work with with xopen("data.csv.zst", "rb") as f, see #7287
@ghuls at Nextstrain we've started to zst compress everything, so yeah, there are now metadata.tsv.zst's in the wild :)
@ritchie46: Why go via pandas? pl.read_csv accepts a BytesIO?
See #9266 why pl.read_csv doesn't work in this case, unless I'm doing something wrong, read_csv reads the whole file into memory. I haven't found a way to emulate pandas usecol behaviour for zstd compressed csvs with polars.
You can use parquet-fromcsv in the meantime to convert compressed CSV/TSV files to parquet and use pl.scan_parquet on them:
https://github.com/pola-rs/polars/issues/9283#issuecomment-1594512827
Looking through the code, it seems like the limitation eventually devolves to LazyScanBuilder::scan_csv(), which accepts a Into<std::path::PathBuf>. It then opens that and converts it into a File. After a bit of back and forth this gets turned into a ReaderBytes. And then later it makes a LogicalPlan::Scan which takes a PathBuf.
LogicalPlan::Scan eventually becomes a ALogicalPlan::Scan I think, and there's some code in physical_plan/planner/lp.rs which creates a CvsExec from that, which in turn creates a CsvReader... specifically for CSVs. An ALogicalPlan::Scan can also be parquet etc., so option will either be splitting up the work across multiple PRs and there's intermediate stage where #10413 is not yet done (but closer) or doing it in one big change, which seems worse.
There are likely other code paths that interact with (A)LogicalPlan::Scan.
CsvReader works off of anything with MmapBytesReader trait (as does ParquetReader).
Specifically for the BytesIO case, to support MmapBytesReader one would getbuffer(), which prevents resizing at least. One could then use the buffer API to extract a pointer to the underlying data, and then multiple threads can read from that without holding the GIL, so long as additionally a reference is kept to the BytesIO object (or Python buffer object) so it isn't GCed.
My first thought for implementation based on above is replacing all PathBuf in the various classes (LogicalPlan::Scan etc) with:
enum FileSource {
Path(PathBuf),
Reader(Box<dyn MmapBytesReader>),
}
But... that's sort of limiting. The more general solution is making the CsvStreamer not assuming the CSV fits in memory. In particular, if csv CoreReader could bee modified to take a GetReaderBytes trait implementor as an input instead of ReaderBytes, which lets you get ReaderBytes at some position and ReaderBytes can implement GetReaderBytes for itself, and much of the code would stay the same.
trait GetReaderBytes {
fn read(&self, offset: usize, length: usize) -> ReaderBytes
}
You can use
parquet-fromcsvin the meantime to convert compressed CSV/TSV files to parquet and usepl.scan_parqueton them: #9283 (comment)
I think I'm understanding correctly that this is a recommendation to use a rust library. Any advice for the less-cool among us who are still working pretty exclusively from a python environment?
You can use
parquet-fromcsvin the meantime to convert compressed CSV/TSV files to parquet and usepl.scan_parqueton them: #9283 (comment)I think I'm understanding correctly that this is a recommendation to use a rust library. Any advice for the less-cool among us who are still working pretty exclusively from a python environment?
It is a command line tool, but part of the rust arrow library.
You can use
parquet-fromcsvin the meantime to convert compressed CSV/TSV files to parquet and usepl.scan_parqueton them: #9283 (comment)I think I'm understanding correctly that this is a recommendation to use a rust library. Any advice for the less-cool among us who are still working pretty exclusively from a python environment?
It is a command line tool, but part of the rust
arrowlibrary.
awesome, thanks for the correction, yeah cli tools work for my use case.
+1 to this. Implementing this proposal would allow for scanning memory-mapped files that need additional processing, e.g. parsing custom headers.