[Python][FlightRPC] IPC error using Python GeneratorStream for tables containing Categorical / DictionaryArray
pyarrow=13.0.0
Seeing the following error from the Flight client side when calling reader.read_all()
ArrowInvalid: IPC stream did not have the expected number (1) of dictionaries at the start of the stream table.column_names
The issue is only observed when:
- the server returns a "GeneratorStream" instead of "RecordBatchStream"
- the table contains column of Categorical / DictionaryArray type
Server.py:
import pyarrow as pa
from pyarrow.flight import FlightServerBase, Location, ServerCallContext, Ticket, GeneratorStream
import pandas as pd
class FlightServer(FlightServerBase):
def __init__(self, host, port):
super().__init__(Location.for_grpc_tcp(host, port))
def do_get(self, context: ServerCallContext, ticket: Ticket):
df = pd.DataFrame.from_dict({
'col_1': pd.Categorical(['a', 'b', 'c', 'a', 'b', 'c'])
})
table = pa.Table.from_pandas(df)
return GeneratorStream(schema=table.schema, generator=table.to_batches())
# return RecordBatchStream(table) -> this doesnt have issue
if __name__ == '__main__':
server = FlightServer('0.0.0.0', 7688)
print(f'Starting Flight server {server.port}')
server.serve()
Client.py
from pyarrow.flight import Location, FlightClient
location = Location.for_grpc_tcp('0.0.0.0', 7688)
client = FlightClient(location)
from pyarrow.flight import Ticket
ticket = Ticket('')
reader = client.do_get(ticket)
reader.read_all()
Component(s)
FlightRPC, Python
We don't handle dictionaries here, that should probably be fixed: https://github.com/apache/arrow/blob/818f71d085b6f820903afc6b1f1e577d8e45ff47/python/pyarrow/_flight.pyx#L2002-L2089
I think nowadays, we could/should implement GeneratorStream in terms of RecordBatchReader.from_batches, which would save us from having to deal with these details
That also means that if you need a workaround, you should try:
reader = pyarrow.RecordBatchReader.from_batches(table.schema, table.to_batches())
return pyarrow.flight.RecordBatchStream(reader)
Thanks a lot David!
Hey @lidavidm was this ever fixed? Right now if I want to stream my data in python I use RecordBatchStream that wraps the RecordBatchReader as you mentioned, but the problem with this method is that it loads all the table to RAM before it starts to stream the batches and that is not a good memory performance for my needs. It also says so in this official source - https://arrow.apache.org/cookbook/py/flight.html "While GeneratorStream has the advantage that it can stream data, that means Flight must call back into Python for each record batch to send. In contrast, RecordBatchStream requires that all data is in-memory up front, but once created, all data transfer is handled purely in C++, without needing to call Python code."
Will there be a fix to GeneratorStream to support Dictionary types? or another reader object that can support reading the memory from the remote file system in batches and not loading it all into memeory?
Thanks
Contributions are always welcome.
That said, if you have a RecordBatchReader and it is backed by a file, RecordBatchStream should not be loading it into memory...that part of the docs is just talking about if you have a Table
@lidavidm hmm then maybe I'm doing something wrong? what I did to test it was something along these lines:
def do_get(self, context, ticket):
....
reader = get_reader(columns, path)
return fl.RecordBatchStream(reader)
def get_reader(columns, path)
def generate_record_batches():
for batch in gen:
batch_size_mb = batch.nbytes / 1024 / 1014
memory_used = pyarrow.total_allocated_bytes() / 1024 / 1024
print(f"Batch size: {batch_size_mb} MB")
print(f"Memory used right now: {memory_used} MB")
yield batch
filesystem = fs.S3FileSystem()
dataset = ds.dataset(path, filesystem=filesystem, format=ds.ParquetFileFormat(default_fragment_scan_options=ds.ParquetFragmentScanOptions(pre_buffer=True)))
gen = dataset.to_batches(batch_size=batch_size, columns=columns)
schema = dataset.schema
reader = pyarrow.RecordBatchReader.from_batches(schema, generate_record_batches())
return reader
while the file I tested is ~210MB, I saved it in s3 with row groups of around 18MB, and I set the batch size to also be around 18MB.
But in the test I've seen that on the first yield, the memory usage of arrow was already around 210MB while the size of each batch is indeed 18MB, and therefore I assumed that it reads the whole file into memory and only then send batches..
So what am I missing? I can provide more information if needed :)
I believe the dataset reader may preload batches by default. (Also, row group size has little bearing on in-memory size.)
Oh so how can I configure it to not do that by default?
https://arrow.apache.org/docs/python/generated/pyarrow.dataset.Dataset.html#pyarrow.dataset.Dataset.scanner
+1 on this issue for tracking.