pyo3 icon indicating copy to clipboard operation
pyo3 copied to clipboard

Integrating Polars with Datafusion

Open mwiewior opened this issue 1 year ago • 1 comments

Hey I'm working on implementing streaming functionality for Polars on top of some extensions to DataFusion. For this purpose I added streaming support for AnonymousScan in Polars:

pub struct RangeOperationScan {
    pub(crate) df_iter: Arc<Mutex<SendableRecordBatchStream>>,
}


impl AnonymousScan for SomeOperationScan {
    fn as_any(&self) -> &dyn std::any::Any {
        self
    }

    fn scan(&self, scan_opts: AnonymousScanArgs) -> PolarsResult<polars::prelude::DataFrame> {
        !todo!("Only streaming is supported")
    }

    fn next_batch(
        &self,
        scan_opts: AnonymousScanArgs,
    ) -> PolarsResult<Option<polars::prelude::DataFrame>> {
        let mutex_stream = Arc::clone(&self.df_iter);
        thread::spawn(move ||{
            let rt = Runtime::new().unwrap();
            let result = rt.block_on(mutex_stream.lock().unwrap().next()); // <-- I think here is the problem
            match result {
                Some(batch) => {
                    let rb = batch.unwrap();
                    let schema_polars = convert_arrow_rb_schema_to_polars_df_schema(&rb.schema())?; 
                    let df = convert_arrow_rb_to_polars_df(&rb, &schema_polars)?;
                    Ok(Some(df))
                },
                None => Ok(None),
            }
        }).join().unwrap()
        

an then:

fn lazy_range_operation_scan(
    py: Python<'_>,
    py_ctx: &PyBioSessionContext,
    df_path1: String,
    df_path2: String,
    range_options: RangeOptions,
) -> PyResult<PyLazyFrame> {
    py.allow_threads(|| {
        
// some code removed
      
        let rt = Runtime::new().unwrap();
        let ctx = &py_ctx.ctx;

        let args = ScanArgsAnonymous {
            schema: Some(Arc::new()),
            ..ScanArgsAnonymous::default()
        };
      
   // some code removed 
        let stream = rt.block_on(df?.execute_stream())?;
        let scan = RangeOperationScan {
            df_iter: Arc::new(Mutex::new(stream)),
        };
        let function = Arc::new(scan);
        let lf = LazyFrame::anonymous_scan(function, args).unwrap();
        Ok(lf.into())
}

Everything works like charm when there is only Datafusion runs using a single-thread (or multithread with Polars Rust API). If i try to run in multi-threaded mode in Python I suspect that main thread quits causing Datafusion tasks get cancelled:

thread '<unnamed>' panicked at src/scan.rs:97:36:
called `Result::unwrap()` on an `Err` value: External(Internal("Non Panic Task error: task 43 was cancelled"))
stack backtrace:
   0:        0x34c592498 - <unknown>
   1:        0x34c5b4834 - <unknown>

not sure if it's possible to prevent that five that next_batch method is called externally by the Polars engine.

mwiewior avatar Jan 08 '25 15:01 mwiewior

Sorry for the slow reply here. This doesn't really seem to be a specific PyO3 issue, can you please reduce this to more directly give a minimal repro using just PyO3 which exhibits the behaviour you find problematic?

davidhewitt avatar Apr 08 '25 21:04 davidhewitt