Integrating Polars with Datafusion
Hey I'm working on implementing streaming functionality for Polars on top of some extensions to DataFusion. For this purpose I added streaming support for AnonymousScan in Polars:
pub struct RangeOperationScan {
pub(crate) df_iter: Arc<Mutex<SendableRecordBatchStream>>,
}
impl AnonymousScan for SomeOperationScan {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn scan(&self, scan_opts: AnonymousScanArgs) -> PolarsResult<polars::prelude::DataFrame> {
!todo!("Only streaming is supported")
}
fn next_batch(
&self,
scan_opts: AnonymousScanArgs,
) -> PolarsResult<Option<polars::prelude::DataFrame>> {
let mutex_stream = Arc::clone(&self.df_iter);
thread::spawn(move ||{
let rt = Runtime::new().unwrap();
let result = rt.block_on(mutex_stream.lock().unwrap().next()); // <-- I think here is the problem
match result {
Some(batch) => {
let rb = batch.unwrap();
let schema_polars = convert_arrow_rb_schema_to_polars_df_schema(&rb.schema())?;
let df = convert_arrow_rb_to_polars_df(&rb, &schema_polars)?;
Ok(Some(df))
},
None => Ok(None),
}
}).join().unwrap()
an then:
fn lazy_range_operation_scan(
py: Python<'_>,
py_ctx: &PyBioSessionContext,
df_path1: String,
df_path2: String,
range_options: RangeOptions,
) -> PyResult<PyLazyFrame> {
py.allow_threads(|| {
// some code removed
let rt = Runtime::new().unwrap();
let ctx = &py_ctx.ctx;
let args = ScanArgsAnonymous {
schema: Some(Arc::new()),
..ScanArgsAnonymous::default()
};
// some code removed
let stream = rt.block_on(df?.execute_stream())?;
let scan = RangeOperationScan {
df_iter: Arc::new(Mutex::new(stream)),
};
let function = Arc::new(scan);
let lf = LazyFrame::anonymous_scan(function, args).unwrap();
Ok(lf.into())
}
Everything works like charm when there is only Datafusion runs using a single-thread (or multithread with Polars Rust API). If i try to run in multi-threaded mode in Python I suspect that main thread quits causing Datafusion tasks get cancelled:
thread '<unnamed>' panicked at src/scan.rs:97:36:
called `Result::unwrap()` on an `Err` value: External(Internal("Non Panic Task error: task 43 was cancelled"))
stack backtrace:
0: 0x34c592498 - <unknown>
1: 0x34c5b4834 - <unknown>
not sure if it's possible to prevent that five that next_batch method is called externally by the Polars engine.
Sorry for the slow reply here. This doesn't really seem to be a specific PyO3 issue, can you please reduce this to more directly give a minimal repro using just PyO3 which exhibits the behaviour you find problematic?