datafusion icon indicating copy to clipboard operation
datafusion copied to clipboard

doc: RecordBatchReceiverStreamBuilder::spawn_blocking does not abort threads

Open tv42 opened this issue 1 year ago • 3 comments

Describe the bug

https://docs.rs/datafusion/latest/datafusion/physical_plan/stream/struct.RecordBatchReceiverStreamBuilder.html#method.spawn_blocking docs say

Spawn a blocking task that will be aborted if this builder (or the stream built from it) are dropped

There's no aborting happening, and I don't think one can do that safely (no abort in https://doc.rust-lang.org/std/thread/struct.Thread.html).

What actually happens is that tx.blocking_send and friends start to give errors, and the caller-provided closure is responsible for returning when that happens.

It's probably worthwhile checking whether this is also true for the async variant, or whether it actually aborts the tokio task at any await point.

https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.Sender.html#method.blocking_send

To Reproduce

use datafusion::arrow::array::{ArrayRef, UInt64Array};
use datafusion::arrow::datatypes::{DataType, Field, SchemaBuilder};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::arrow::util::pretty::print_batches;
use datafusion::error::DataFusionError;
use datafusion::physical_plan::stream::RecordBatchReceiverStreamBuilder;
use futures::StreamExt;
use std::sync::Arc;
use std::time::Duration;

fn make_dummy_record_batch() -> Result<RecordBatch, DataFusionError> {
    let a: ArrayRef = Arc::new(UInt64Array::from(vec![1, 2]));
    let record_batch = RecordBatch::try_from_iter(vec![("a", a)])
        .map_err(|error| DataFusionError::External(Box::new(error)))?;
    Ok(record_batch)
}

#[tokio::main]
async fn main() {
    let schema = {
        let mut builder = SchemaBuilder::new();
        builder.push(Field::new("a", DataType::UInt64, false));
        builder.finish()
    };
    let mut stream = {
        let mut builder = RecordBatchReceiverStreamBuilder::new(Arc::new(schema), 1);
        {
            let tx = builder.tx();
            builder.spawn_blocking(move || loop {
                let record_batch = make_dummy_record_batch()?;
                match tx.blocking_send(Ok(record_batch)) {
                    Ok(()) => (),
                    Err(_sent) => {
                        // If we ignore this error, nothing "aborts" us.
                        eprintln!("ignoring send error for demonstration purposes");
                    }
                };
                println!("tick");
                std::thread::sleep(Duration::from_millis(300));
            });
        }
        builder.build()
    };

    for i in 0..3 {
        let record_batch = stream.next().await.unwrap().unwrap();
        println!("Record Batch #{i}");
        print_batches(&[record_batch]).unwrap();
    }

    // Docs claim this will abort the producer.
    drop(stream);

    // If it were aborted, we should stop seeing "tick" messages any moment now!
    tokio::time::sleep(Duration::new(5, 0)).await;
    println!("is it still going? bailing out.");
    std::process::exit(0);
}

Expected behavior

The behavior makes sense, but should be documented.

Additional context

No response

tv42 avatar Feb 07 '24 20:02 tv42

It seems to me that a blocking task is actually aborted / cancelled on drop, but once a blocking task starts it can't be interrupted.

So for example, if the task hasn't started yet it will never start. However, once the blocking task starts running, it will run to completion as it never yields control back to tokio (via await)

alamb avatar Feb 08 '24 13:02 alamb

I think we can summarize the discussion on this ticket into the doc comments and that would make a good first issue

alamb avatar Feb 08 '24 13:02 alamb

Hi @alamb, I'm a newcomer to this project and interested in addressing this specific issue as my starting point.

May I seek clarification this issue as a task involving refining the documentation to accurately describe the current behavior of the code?

zhenglin-charlie-li avatar Feb 15 '24 19:02 zhenglin-charlie-li