arrow-julia
arrow-julia copied to clipboard
Add an indexable variant of Arrow.Stream
In distributed computing context it would be nice to have a vector-variant of Arrow.Stream
iterator. The idea is to be able to split processing of a single large arrow file with multiple record batches into multiple worker processes. Looking at the source code this should be possible to be done in a relatively efficient way.
@quinnj - what do you think?
I don't think this is possible. The Arrow file format is a series of FlatBuffer messages that are not indexed and therefore have to be iterated over. More concretely, the BatchIterator
doesn't support random access.
My idea was that the constructor of such indexable object could do the indexing you mention. I assume that the whole file would have to be scanned, but maybe it could be done in a cheap way, i.e. without having to read/interpret all the data stored in the file.
Yeah, we could probably add support for this. Maybe with a lazy::Bool=true
keyword argument; lazy=false
would eagerly iterate messages and store the positions so they could be randomly accessed while lazy=true
gives the current behavior where each iteration only consumes one message.
Curious though, because a non-hard workflow you can already do is:
for record_batch in Arrow.Stream(...)
Distributed.@spawn begin
# do stuff with record_batch
end
end
what are the alternative workflows where that doesn't work for you?
What you propose works, but I thought in this approach the parallelism would not be achieved (i.e. that Arrow.Stream
would parse values before moving forward to the next record batch). If it does just skip ahead then the issue can be closed.
Ah, you're correct; we do all the message processing in the Arrow.Stream
iterate method. Ok, yeah, we should provide an alternative here.
Ah, you're correct; we do all the message processing in the
Arrow.Stream
iterate method. Ok, yeah, we should provide an alternative here.
This would be a great improvement as it would also allow predicate-pushdown at the RecordBatch level based on Message-level metatdata, thus opening up the ability to operate on a single RecordBatch without uncompressing all RecordBatches in a file. This is an important feature for me so I'll try to spend some time building this without breaking too much.
My idea was that the constructor of such indexable object could do the indexing you mention. I assume that the whole file would have to be scanned, but maybe it could be done in a cheap way, i.e. without having to read/interpret all the data stored in the file.
If the data uses the "IPC File Format" then the footer (link) should contain all the information we need to construct this index. This should be more performant than scanning the whole file, but is certainly an optimization as scanning should also be supported.
My idea was that the constructor of such indexable object could do the indexing you mention. I assume that the whole file would have to be scanned, but maybe it could be done in a cheap way, i.e. without having to read/interpret all the data stored in the file.
I implemented this minus the indexing. Thoughts?