ray_beam_runner
ray_beam_runner copied to clipboard
Create an Arrow Coder for Beam that allows us to create Ray Datasets
This coder is the first step to allow us to create Ray Datasets based on Beam PCollections.
fyi @TheNeuralBit
Thanks, I think it's worth tracking this in a Beam issue as well. Could you provide some references for Ray Datasets that would inform how an Arrow encoded PCollection can integrate with it?
theres's very silly superficial stuff I wrote here: https://docs.google.com/document/d/1DcuKhCPnZezIvu9vFMsM4BRdBv0kgAWewOJqRbS42GI/edit#
Specifically, I would say read_datasource
may be a good point to look at - it spins up several Ray Tasks that read individual blocks. Each block is usually stored as a block of Arrow records.
I suppose an integration we could have is something like:
class RayPCollection(beam.PCollection):
def to_dataset(self):
pipeline_result = self.pipeline.run()
pcoll = pipeline_result.get_pcoll(pcoll)
return ray.data.Dataset(BlockList(pcoll.block_list))
or something like that
Beam has a few utilities to convert to-from Beam and Arrow schemas (see here).
A first step would be to write an ArrowRecordBatchCoder
, which can be constructed with a Beam Schema or an Arrow Schema, where each individual element is an Arrow RecordBatch
- Coder interfaces for Beam - https://github.com/apache/beam/blob/master/sdks/python/apache_beam/coders/coders.py
- An example of a Row Coder (complex) and a VarIntCoder (simple)
And then we can write a simple PTransform that takes a Beam PCollection of rows with schema, into a Beam PCollection where each element is an Arrow RecordBatch. (and viceversa)
Then it becomes easier to pass this to Ray's Datasets (and also into Beam from Datasets).
A second step could be to encode Beam Rows as batches of Arrow Records, but we can think about that once we do the first step.