arrow-julia icon indicating copy to clipboard operation
arrow-julia copied to clipboard

Add an indexable variant of Arrow.Stream

Open bkamins opened this issue 2 years ago • 7 comments

In distributed computing context it would be nice to have a vector-variant of Arrow.Stream iterator. The idea is to be able to split processing of a single large arrow file with multiple record batches into multiple worker processes. Looking at the source code this should be possible to be done in a relatively efficient way.

@quinnj - what do you think?

bkamins avatar Oct 30 '22 11:10 bkamins

I don't think this is possible. The Arrow file format is a series of FlatBuffer messages that are not indexed and therefore have to be iterated over. More concretely, the BatchIterator doesn't support random access.

baumgold avatar Nov 01 '22 18:11 baumgold

My idea was that the constructor of such indexable object could do the indexing you mention. I assume that the whole file would have to be scanned, but maybe it could be done in a cheap way, i.e. without having to read/interpret all the data stored in the file.

bkamins avatar Nov 01 '22 19:11 bkamins

Yeah, we could probably add support for this. Maybe with a lazy::Bool=true keyword argument; lazy=false would eagerly iterate messages and store the positions so they could be randomly accessed while lazy=true gives the current behavior where each iteration only consumes one message.

Curious though, because a non-hard workflow you can already do is:

for record_batch in Arrow.Stream(...)
    Distributed.@spawn begin
        # do stuff with record_batch
    end
end

what are the alternative workflows where that doesn't work for you?

quinnj avatar Nov 02 '22 20:11 quinnj

What you propose works, but I thought in this approach the parallelism would not be achieved (i.e. that Arrow.Stream would parse values before moving forward to the next record batch). If it does just skip ahead then the issue can be closed.

bkamins avatar Nov 02 '22 20:11 bkamins

Ah, you're correct; we do all the message processing in the Arrow.Stream iterate method. Ok, yeah, we should provide an alternative here.

quinnj avatar Nov 02 '22 20:11 quinnj

Ah, you're correct; we do all the message processing in the Arrow.Stream iterate method. Ok, yeah, we should provide an alternative here.

This would be a great improvement as it would also allow predicate-pushdown at the RecordBatch level based on Message-level metatdata, thus opening up the ability to operate on a single RecordBatch without uncompressing all RecordBatches in a file. This is an important feature for me so I'll try to spend some time building this without breaking too much.

My idea was that the constructor of such indexable object could do the indexing you mention. I assume that the whole file would have to be scanned, but maybe it could be done in a cheap way, i.e. without having to read/interpret all the data stored in the file.

If the data uses the "IPC File Format" then the footer (link) should contain all the information we need to construct this index. This should be more performant than scanning the whole file, but is certainly an optimization as scanning should also be supported.

baumgold avatar Nov 09 '22 20:11 baumgold

My idea was that the constructor of such indexable object could do the indexing you mention. I assume that the whole file would have to be scanned, but maybe it could be done in a cheap way, i.e. without having to read/interpret all the data stored in the file.

I implemented this minus the indexing. Thoughts?

JoaoAparicio avatar Apr 03 '23 00:04 JoaoAparicio