ibis
ibis copied to clipboard
feat: file outputs
Ibis currently lacks the ability to write to output files (csv, parquet, ...). One way this could be handled is by streaming the results back (#4443) and handling all IO in python using the relevant pyarrow/fsspec apis.
However, some backends (duckdb, dask, pyspark, ...) may have a more efficient way of writing certain formats natively. It'd be good to support using the backend-native mechanisms when possible.
One way to architect this would be to define a generic "Sink" abstraction, and make use of it to build different IO routines:
class Sink:
def __init__(self, **kwargs: Any) -> None:
"""Sink options (output directory, ...) would be passed in to the
constructor"""
pass
def setup(self) -> None:
"""Called at the start of processing to setup the sink"""
pass
def handle_batch(self, batch: pyarrow.RecordBatch) -> None:
"""Called during results handling on every RecordBatch in the table."""
pass
def finish(self, succeeded: bool) -> Any:
"""Called at the end of processing to cleanup the sink.
If an error occurred at any point during processing, `succeeded` will
be false. The result of `finish` is returned to the caller.
"""
pass
For example, a simple parquet sink might look like:
class ParquetSink(Sink):
def __init__(self, directory: str) -> None:
self.directory = directory
self.count = 0
def setup(self) -> None:
os.makedirs(self.directory, exist_ok=True)
def handle_batch(self, batch: pyarrow.RecordBatch) -> None:
table = pa.Table.from_batches([batch])
path = os.path.join(self.directory, f"part-{self.count}.parquet")
pa.parquet.write_table(table, path)
self.count += 1
The ParquetSink class itself wouldn't be primarily user-facing, but would rather be wrapped in a nicer user-facing method:
class Table:
...
def to_sinks(self, *sinks: Sink) -> Any:
"""Write to multiple sinks"""
# todo
def to_parquet(self, directory: str) -> None:
self.to_sinks(ParquetSink(directory))
The to_sinks method would handle the execution and dispatch to the various sinks. In its simplest form its a loop around to_pyarrow_batches() (from #4443), calling handle_batch on each sink.
However, we also want to be able to possibly dispatch the sink operation to be handled directly in a backend. One way to do that would be to expose a new multipledispatch function:
@dispatch(Backend, Sink)
def execute_sink(backend, sink) -> Any:
raise NotImplementedError
to_sinks would then first loop over all sinks, attempting to execute them natively with execute_sink, and catching NotImplementedError. If any sinks aren't handled natively (either no implementation is registered in the dispatch, or the options specified in the sink can't be handled natively), then the fallback loop-over-batches implementation is used. Something like:
def to_sinks(self, *sinks):
needs_fallback = []
results = {}
backend = self.get_backend()
for sink in sinks:
try:
results[sink] = execute_sink(backend, sink)
except NotImplementedError:
needs_fallback.append(sink)
if needs_fallback:
for sink in needs_fallback:
sink.setup()
try:
for batch in self.to_pyarrow_batches():
sink.handle_batch(batch)
except Exception:
for sink in needs_fallback:
sink.finish(False)
raise
for sink in needs_fallback:
results[sink] = sink.finish(True)
return [results[s] for s in sinks]
The Sink idea is good. A few thoughts:
- why does
to_sinks()handle multiple sinks? In case you want to write to both parquet and csv, and they could be done at the same time? If so, then the public API ofto_parquet()wouldn't work, because users could only call those sequentially. - I foresee a lot of backends having this boiler plate
def execute_sink(self, sink) -> Any:
if isinstance(sink, ParquetSink):
...
elif isinstance(sink, ParquetSink):
...
else:
raise NotImplemented
Why not have separate hooks of execute_parquet_sink(), execute_csv_sink(), etc, there are only going to be <10 of these.
3. We will have to be careful about the public API of to_parquet() to unify the different backends. For instance with duckdb you can specify the compression, but other backends this might not be possible.
4. Sink.handle_batch() might want to accept additional parameters such as the chunk number, starting row index, ending row index, etc.
5. API inspiration from vaex: go here and ctrl-f for "export_"
This feature would be very convenient for ETL workloads now that Ibis exposes ibis.read_parquet and ibis.read_csv.
Completed via #5457