polars
polars copied to clipboard
Custom `sink` implementations
Purpose
I want to build an asynchornous HTTP server that uses polars under the hood to manipulate some data frames. From my understanding the sink_* methods only allow writing to files but I want to be able to stream to an HTTP Response Body for example.
Since libraries like hyper use a Sender (as in mpsc) to build a Body I though creating a way to write to any sender would be awesome.
My implemetation
I exposed the FileType enum in order to now have to create different sink_* implementations, however this could be changed.
I also created a SinkSender trait that implements all the methods necesarry to create custom sink implementations.
Additionally, I created a SendError variant in PolarsError.
Example
In this example I placed a single iris.csv file, just for testing purposes.
use bytes::Bytes;
use polars::prelude::*;
use std::{
sync::mpsc::{sync_channel, Receiver, SyncSender},
thread,
};
#[derive(Debug)]
struct MySender(SyncSender<Bytes>);
impl SinkSender for MySender {
fn sink_send(&self, buf: &[u8]) -> PolarsResult<usize> {
self.0.send(Bytes::copy_from_slice(buf)).map_or_else(
|e| Err(PolarsError::SendError(e.to_string().into())),
|_| Ok(buf.len()),
)
}
fn sink_flush(&self) -> PolarsResult<()> {
Ok(())
}
}
fn spawn_reader_thread(rx: Receiver<Bytes>) -> thread::JoinHandle<()> {
thread::spawn(move || {
while let Ok(buf) = rx.recv() {
println!("{:?}", buf);
}
})
}
fn main() {
let (tx, rx) = sync_channel::<Bytes>(0);
let iris = LazyCsvReader::new("iris.csv")
.has_header(true)
.finish()
.unwrap();
let reader_job = spawn_reader_thread(rx);
iris.sink_sender(MySender(tx), FileType::Ipc(IpcWriterOptions::default()))
.unwrap();
reader_job.join().unwrap();
}
Why the SinkSender trait at all? Why not just Write?
Why the
SinkSendertrait at all? Why not justWrite?
Using 'static + Write would require some big refactoring but I could try.
Apologies for leaving this PR around so long without a review.
It is not completely clear to me what you're trying to achieve. Please make an issue detailing your request so we can discuss whether we want to include it in Polars.
Since this PR has gathered some conflicts by now and we're not sure yet if we want the functionality, I'll close it for now. Feel free to rebase and open a new PR if the related issue is accepted.
I guess the idea is to provide an interface for people to implement their own sink implementations, e.g. for writing asynchronously to a HTTP socket rather than to disk (CSV, Parquet, ...). Does this not make sense in Polars? I'm new to the library but also looking for a way to connect my lazy evaluation pipeline to a consumer without having to materialize the entire dataframe. Streaming, or a simple Python generator, essentially. Do you have some thoughts on this?