iceberg-rust icon indicating copy to clipboard operation
iceberg-rust copied to clipboard

Enhancement: refine the reader interface

Open ZENOTME opened this issue 1 year ago • 13 comments

Hi, I find that in some cases our reader interface seems redundant for me.

E.g.

let table_scan = table
            .scan()
            .with_batch_size(Some(self.batch_size))
            .select(self.schema.names())
            .predict(predict)
            .build()
            .map_err(BatchError::Iceberg)?;
let file_scan_stream = table_scan.plan_files();

// Create a reader here. We need the info already pass to table_scan again
let reader =  ArrowReaderBuilder::new(self.file_io.clone(), self.schema.clone())
                         .with_field_id(....)
                         .wtih_predict(..)

for file_scan in file_scan_stream {
    let arrow_batch_stream = reader.read(file_scan)
}

I recommend we should move the field_id, predicates info into file_scan. Config this info again in reader is not friendly for user and is prone to inconsistent.

ZENOTME avatar Jun 10 '24 05:06 ZENOTME

cc @liurenjie1024 @sdd @Fokko @Xuanwo @viirya

ZENOTME avatar Jun 10 '24 05:06 ZENOTME

I sent a PR(#401) to draft the idea, feel free to tell me if there is something that can be improved.

ZENOTME avatar Jun 11 '24 11:06 ZENOTME

Hi, @ZENOTME I think already there exists a to_arrow method here: https://github.com/apache/iceberg-rust/blob/15e61f23198c4cc5d320d631e22e2fbc02d167c8/crates/iceberg/src/scan.rs#L294

liurenjie1024 avatar Jun 13 '24 13:06 liurenjie1024

But I agree that we should not make the ArrowReaderBuilder 's methods crate private rather a public api since it's error prone.

liurenjie1024 avatar Jun 13 '24 13:06 liurenjie1024

Hi, @ZENOTME I think already there exists a to_arrow method here:

https://github.com/apache/iceberg-rust/blob/15e61f23198c4cc5d320d631e22e2fbc02d167c8/crates/iceberg/src/scan.rs#L294

Yes, but I think the benefit of this PR is more about the use case for the computing engine. to_arrow is used to convert the scan to an arrow batch stream. But for the computing engine, what it expects is to get the file scan task from the scan, split these task, and distribute them to different compute nodes to get the parallel read ability. I think that's one reason we provide the https://github.com/apache/iceberg-rust/blob/15e61f23198c4cc5d320d631e22e2fbc02d167c8/crates/iceberg/src/scan.rs#L201.

For this use case, the user creates the reader, and uses it to convert the file scan task to an arrow batch stream rather than use to_arrow directly, like the following:

let reader =  ArrowReaderBuilder::new(self.file_io.clone(), self.schema.clone())
                         .with_field_id(....)
                         .wtih_predict(..)

for file_scan in file_scan_stream {
    let arrow_batch_stream = reader.read(file_scan)
}

But for now, the reader is not friendly for this use case. It's redundant and prone to inconsistent to provide the field_id and predict info for the reader because these have already been used to create a scan before. A more friendly way is to contain this necessary info in the scan task so that the reader is just "stateless" and without the inconsistent problem, like the following:

let reader =  ArrowReaderBuilder::new(self.file_io.clone(), self.schema.clone());

for file_scan in file_scan_stream {
    let arrow_batch_stream = reader.read(file_scan)
}

ZENOTME avatar Jun 13 '24 14:06 ZENOTME

Thanks for the explaination, I'll take a look at the pr.

liurenjie1024 avatar Jun 13 '24 14:06 liurenjie1024

The drawback of the current implementation is containing the metadata in every scan task may be a cost. One solution is to provide an abstract to group the task to share the metadata. image

Another design is to separate the metadata and scan tasks. We provide the interface to let users extract the metadata from a scan so that they can create the reader using this metadata rather than constructing it by themselves. (Seem this way gives more flexible🤔 image

ZENOTME avatar Jun 13 '24 14:06 ZENOTME

Thanks for this @ZENOTME. The first diagram in your comment above is exactly what I'm currently working to implement as part of a service that exposes an Apache Arrow Flight interface. Each Ticket will contain a serialized struct containing some request metadata (containing the filter predicate) and a list of scan tasks.

sdd avatar Jun 14 '24 05:06 sdd

Sorry, I don't quite get what's the meta here?

liurenjie1024 avatar Jun 15 '24 08:06 liurenjie1024

Sorry, I don't quite get what's the meta here?

e.g. field_id, predicate. The info that can be shared by the set of FileScanTasks. Maybe "metadata" is not precise here.

ZENOTME avatar Jun 15 '24 08:06 ZENOTME

Sorry, I don't quite get what's the meta here?

e.g. field_id, predicate. The info that can be shared by the set of FileScanTasks. Maybe "metadata" is not precise here.

I agree that they are same in different tasks, but I don't quite get how to share them. In distributed query engine like spark, the distribute tasks to different hosts, one way to do that is utilizing broadcat in spark to do that, but it increases complexity in implementation.

liurenjie1024 avatar Jun 15 '24 08:06 liurenjie1024

Sorry, I don't quite get what's the meta here?

e.g. field_id, predicate. The info that can be shared by the set of FileScanTasks. Maybe "metadata" is not precise here.

I agree that they are same in different tasks, but I don't quite get how to share them. In distributed query engine like spark, the distribute tasks to different hosts, one way to do that is utilizing broadcat in spark to do that, but it increases complexity in implementation.

In this model, it's the user responsibility to share(distribute) the "metadata" to different hosts. What we provide is a method which to get the "metadata" from scan and the "metadata" is serializable/desirable.

#[derive(Serialize,Deserialize)]
struct Metadata {
  field_ids: Vec<i32>,
  predicate: BoundPredicate
}

// master node 
let metadata = scan.meta();
send(metadata);

// worker node
let metadata = receive();
let reader = ArrowReaderBuilder::new().with_metadata(metadata);
let scan_tasks = receive();
for task in scan_tasks {
  reader.read(task);
}

one way to do that is utilizing broadcat in spark to do that

I'm not familiar with this. But I think this can be a way spark to distribute the "metadata". Different compute engines can use in different ways. It's flexible but as you say it increases complexity in implementation.

ZENOTME avatar Jun 15 '24 08:06 ZENOTME

Different compute engines provide different capabilities, and I'm not sure if it's worth this complexity since including filters, projections in plan doesn't have too much overhead usually.

liurenjie1024 avatar Jun 15 '24 09:06 liurenjie1024

I think we can close this now since no plan to continue?

xxchan avatar Feb 12 '25 08:02 xxchan

I think we can close this now since no plan to continue?

Yes, let's close this, feel free to open it if necessary.

liurenjie1024 avatar Feb 12 '25 10:02 liurenjie1024