arrow icon indicating copy to clipboard operation
arrow copied to clipboard

[Python][FlightRPC] IPC error using Python GeneratorStream for tables containing Categorical / DictionaryArray

Open phoebey01 opened this issue 2 years ago • 11 comments

pyarrow=13.0.0

Seeing the following error from the Flight client side when calling reader.read_all() ArrowInvalid: IPC stream did not have the expected number (1) of dictionaries at the start of the stream table.column_names

The issue is only observed when:

  1. the server returns a "GeneratorStream" instead of "RecordBatchStream"
  2. the table contains column of Categorical / DictionaryArray type

Server.py:

import pyarrow as pa
from pyarrow.flight import FlightServerBase, Location, ServerCallContext, Ticket, GeneratorStream
import pandas as pd

class FlightServer(FlightServerBase):
    def __init__(self, host, port):
        super().__init__(Location.for_grpc_tcp(host, port))

    def do_get(self, context: ServerCallContext, ticket: Ticket):
        df = pd.DataFrame.from_dict({
            'col_1': pd.Categorical(['a', 'b', 'c', 'a', 'b', 'c'])
        })
        table = pa.Table.from_pandas(df)
        return GeneratorStream(schema=table.schema, generator=table.to_batches())
        # return RecordBatchStream(table) -> this doesnt have issue

if __name__ == '__main__':
    server = FlightServer('0.0.0.0', 7688)
    print(f'Starting Flight server {server.port}')
    server.serve()

Client.py

from pyarrow.flight import Location, FlightClient
location = Location.for_grpc_tcp('0.0.0.0', 7688)
client = FlightClient(location)

from pyarrow.flight import Ticket
ticket = Ticket('')
reader = client.do_get(ticket)
reader.read_all()

Component(s)

FlightRPC, Python

phoebey01 avatar Oct 26 '23 20:10 phoebey01

We don't handle dictionaries here, that should probably be fixed: https://github.com/apache/arrow/blob/818f71d085b6f820903afc6b1f1e577d8e45ff47/python/pyarrow/_flight.pyx#L2002-L2089

I think nowadays, we could/should implement GeneratorStream in terms of RecordBatchReader.from_batches, which would save us from having to deal with these details

lidavidm avatar Oct 26 '23 20:10 lidavidm

That also means that if you need a workaround, you should try:

        reader = pyarrow.RecordBatchReader.from_batches(table.schema, table.to_batches())
        return pyarrow.flight.RecordBatchStream(reader)

lidavidm avatar Oct 26 '23 20:10 lidavidm

Thanks a lot David!

phoebey01 avatar Oct 26 '23 20:10 phoebey01

Hey @lidavidm was this ever fixed? Right now if I want to stream my data in python I use RecordBatchStream that wraps the RecordBatchReader as you mentioned, but the problem with this method is that it loads all the table to RAM before it starts to stream the batches and that is not a good memory performance for my needs. It also says so in this official source - https://arrow.apache.org/cookbook/py/flight.html "While GeneratorStream has the advantage that it can stream data, that means Flight must call back into Python for each record batch to send. In contrast, RecordBatchStream requires that all data is in-memory up front, but once created, all data transfer is handled purely in C++, without needing to call Python code."

Will there be a fix to GeneratorStream to support Dictionary types? or another reader object that can support reading the memory from the remote file system in batches and not loading it all into memeory?

Thanks

xshirax avatar Sep 05 '24 08:09 xshirax

Contributions are always welcome.

lidavidm avatar Sep 05 '24 08:09 lidavidm

That said, if you have a RecordBatchReader and it is backed by a file, RecordBatchStream should not be loading it into memory...that part of the docs is just talking about if you have a Table

lidavidm avatar Sep 05 '24 08:09 lidavidm

@lidavidm hmm then maybe I'm doing something wrong? what I did to test it was something along these lines:

def do_get(self, context, ticket):
    ....
    reader = get_reader(columns, path)
    return fl.RecordBatchStream(reader) 


def get_reader(columns, path)
    def generate_record_batches():
        for batch in gen:
            batch_size_mb = batch.nbytes / 1024 / 1014
            memory_used = pyarrow.total_allocated_bytes() / 1024 / 1024
            print(f"Batch size: {batch_size_mb} MB")
            print(f"Memory used right now: {memory_used}  MB")
            yield batch

    filesystem = fs.S3FileSystem()
    dataset = ds.dataset(path, filesystem=filesystem, format=ds.ParquetFileFormat(default_fragment_scan_options=ds.ParquetFragmentScanOptions(pre_buffer=True)))
    gen = dataset.to_batches(batch_size=batch_size, columns=columns)
    schema = dataset.schema
    reader = pyarrow.RecordBatchReader.from_batches(schema, generate_record_batches())
    return reader

while the file I tested is ~210MB, I saved it in s3 with row groups of around 18MB, and I set the batch size to also be around 18MB.

But in the test I've seen that on the first yield, the memory usage of arrow was already around 210MB while the size of each batch is indeed 18MB, and therefore I assumed that it reads the whole file into memory and only then send batches..

So what am I missing? I can provide more information if needed :)

xshirax avatar Sep 05 '24 09:09 xshirax

I believe the dataset reader may preload batches by default. (Also, row group size has little bearing on in-memory size.)

lidavidm avatar Sep 05 '24 09:09 lidavidm

Oh so how can I configure it to not do that by default?

xshirax avatar Sep 05 '24 09:09 xshirax

https://arrow.apache.org/docs/python/generated/pyarrow.dataset.Dataset.html#pyarrow.dataset.Dataset.scanner

lidavidm avatar Sep 06 '24 00:09 lidavidm

+1 on this issue for tracking.

rustyconover avatar Oct 12 '24 20:10 rustyconover