doc: RecordBatchReceiverStreamBuilder::spawn_blocking does not abort threads
Describe the bug
https://docs.rs/datafusion/latest/datafusion/physical_plan/stream/struct.RecordBatchReceiverStreamBuilder.html#method.spawn_blocking docs say
Spawn a blocking task that will be aborted if this builder (or the stream built from it) are dropped
There's no aborting happening, and I don't think one can do that safely (no abort in https://doc.rust-lang.org/std/thread/struct.Thread.html).
What actually happens is that tx.blocking_send and friends start to give errors, and the caller-provided closure is responsible for returning when that happens.
It's probably worthwhile checking whether this is also true for the async variant, or whether it actually aborts the tokio task at any await point.
https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.Sender.html#method.blocking_send
To Reproduce
use datafusion::arrow::array::{ArrayRef, UInt64Array};
use datafusion::arrow::datatypes::{DataType, Field, SchemaBuilder};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::arrow::util::pretty::print_batches;
use datafusion::error::DataFusionError;
use datafusion::physical_plan::stream::RecordBatchReceiverStreamBuilder;
use futures::StreamExt;
use std::sync::Arc;
use std::time::Duration;
fn make_dummy_record_batch() -> Result<RecordBatch, DataFusionError> {
let a: ArrayRef = Arc::new(UInt64Array::from(vec![1, 2]));
let record_batch = RecordBatch::try_from_iter(vec![("a", a)])
.map_err(|error| DataFusionError::External(Box::new(error)))?;
Ok(record_batch)
}
#[tokio::main]
async fn main() {
let schema = {
let mut builder = SchemaBuilder::new();
builder.push(Field::new("a", DataType::UInt64, false));
builder.finish()
};
let mut stream = {
let mut builder = RecordBatchReceiverStreamBuilder::new(Arc::new(schema), 1);
{
let tx = builder.tx();
builder.spawn_blocking(move || loop {
let record_batch = make_dummy_record_batch()?;
match tx.blocking_send(Ok(record_batch)) {
Ok(()) => (),
Err(_sent) => {
// If we ignore this error, nothing "aborts" us.
eprintln!("ignoring send error for demonstration purposes");
}
};
println!("tick");
std::thread::sleep(Duration::from_millis(300));
});
}
builder.build()
};
for i in 0..3 {
let record_batch = stream.next().await.unwrap().unwrap();
println!("Record Batch #{i}");
print_batches(&[record_batch]).unwrap();
}
// Docs claim this will abort the producer.
drop(stream);
// If it were aborted, we should stop seeing "tick" messages any moment now!
tokio::time::sleep(Duration::new(5, 0)).await;
println!("is it still going? bailing out.");
std::process::exit(0);
}
Expected behavior
The behavior makes sense, but should be documented.
Additional context
No response
It seems to me that a blocking task is actually aborted / cancelled on drop, but once a blocking task starts it can't be interrupted.
So for example, if the task hasn't started yet it will never start. However, once the blocking task starts running, it will run to completion as it never yields control back to tokio (via await)
I think we can summarize the discussion on this ticket into the doc comments and that would make a good first issue
Hi @alamb, I'm a newcomer to this project and interested in addressing this specific issue as my starting point.
May I seek clarification this issue as a task involving refining the documentation to accurately describe the current behavior of the code?