polars icon indicating copy to clipboard operation
polars copied to clipboard

Custom `sink` implementations

Open andyquinterom opened this issue 2 years ago • 2 comments
trafficstars

Purpose

I want to build an asynchornous HTTP server that uses polars under the hood to manipulate some data frames. From my understanding the sink_* methods only allow writing to files but I want to be able to stream to an HTTP Response Body for example.

Since libraries like hyper use a Sender (as in mpsc) to build a Body I though creating a way to write to any sender would be awesome.

My implemetation

I exposed the FileType enum in order to now have to create different sink_* implementations, however this could be changed.

I also created a SinkSender trait that implements all the methods necesarry to create custom sink implementations.

Additionally, I created a SendError variant in PolarsError.

Example

In this example I placed a single iris.csv file, just for testing purposes.

use bytes::Bytes;
use polars::prelude::*;
use std::{
    sync::mpsc::{sync_channel, Receiver, SyncSender},
    thread,
};

#[derive(Debug)]
struct MySender(SyncSender<Bytes>);

impl SinkSender for MySender {
    fn sink_send(&self, buf: &[u8]) -> PolarsResult<usize> {
        self.0.send(Bytes::copy_from_slice(buf)).map_or_else(
            |e| Err(PolarsError::SendError(e.to_string().into())),
            |_| Ok(buf.len()),
        )
    }
    fn sink_flush(&self) -> PolarsResult<()> {
        Ok(())
    }
}

fn spawn_reader_thread(rx: Receiver<Bytes>) -> thread::JoinHandle<()> {
    thread::spawn(move || {
        while let Ok(buf) = rx.recv() {
            println!("{:?}", buf);
        }
    })
}

fn main() {
    let (tx, rx) = sync_channel::<Bytes>(0);
    let iris = LazyCsvReader::new("iris.csv")
        .has_header(true)
        .finish()
        .unwrap();

    let reader_job = spawn_reader_thread(rx);

    iris.sink_sender(MySender(tx), FileType::Ipc(IpcWriterOptions::default()))
        .unwrap();

    reader_job.join().unwrap();
}

andyquinterom avatar Sep 25 '23 19:09 andyquinterom

Why the SinkSender trait at all? Why not just Write?

orlp avatar Sep 26 '23 09:09 orlp

Why the SinkSender trait at all? Why not just Write?

Using 'static + Write would require some big refactoring but I could try.

andyquinterom avatar Sep 29 '23 16:09 andyquinterom

Apologies for leaving this PR around so long without a review.

It is not completely clear to me what you're trying to achieve. Please make an issue detailing your request so we can discuss whether we want to include it in Polars.

Since this PR has gathered some conflicts by now and we're not sure yet if we want the functionality, I'll close it for now. Feel free to rebase and open a new PR if the related issue is accepted.

stinodego avatar Feb 13 '24 11:02 stinodego

I guess the idea is to provide an interface for people to implement their own sink implementations, e.g. for writing asynchronously to a HTTP socket rather than to disk (CSV, Parquet, ...). Does this not make sense in Polars? I'm new to the library but also looking for a way to connect my lazy evaluation pipeline to a consumer without having to materialize the entire dataframe. Streaming, or a simple Python generator, essentially. Do you have some thoughts on this?

fabiannagel avatar Apr 17 '24 16:04 fabiannagel