ray_beam_runner icon indicating copy to clipboard operation
ray_beam_runner copied to clipboard

Create an Arrow Coder for Beam that allows us to create Ray Datasets

Open pabloem opened this issue 2 years ago • 3 comments

This coder is the first step to allow us to create Ray Datasets based on Beam PCollections.

fyi @TheNeuralBit

pabloem avatar Jun 08 '22 03:06 pabloem

Thanks, I think it's worth tracking this in a Beam issue as well. Could you provide some references for Ray Datasets that would inform how an Arrow encoded PCollection can integrate with it?

TheNeuralBit avatar Jun 09 '22 15:06 TheNeuralBit

theres's very silly superficial stuff I wrote here: https://docs.google.com/document/d/1DcuKhCPnZezIvu9vFMsM4BRdBv0kgAWewOJqRbS42GI/edit#

Specifically, I would say read_datasource may be a good point to look at - it spins up several Ray Tasks that read individual blocks. Each block is usually stored as a block of Arrow records.

I suppose an integration we could have is something like:

class RayPCollection(beam.PCollection):

  def to_dataset(self):
    pipeline_result = self.pipeline.run()
    pcoll = pipeline_result.get_pcoll(pcoll)
    return ray.data.Dataset(BlockList(pcoll.block_list))

or something like that

pabloem avatar Jun 10 '22 22:06 pabloem

Beam has a few utilities to convert to-from Beam and Arrow schemas (see here).

A first step would be to write an ArrowRecordBatchCoder, which can be constructed with a Beam Schema or an Arrow Schema, where each individual element is an Arrow RecordBatch

And then we can write a simple PTransform that takes a Beam PCollection of rows with schema, into a Beam PCollection where each element is an Arrow RecordBatch. (and viceversa)

Then it becomes easier to pass this to Ray's Datasets (and also into Beam from Datasets).


A second step could be to encode Beam Rows as batches of Arrow Records, but we can think about that once we do the first step.

pabloem avatar Jan 24 '23 19:01 pabloem