iceberg-python
iceberg-python copied to clipboard
Expose PyIceberg table as PyArrow Dataset
Feature Request / Improvement
Migrated from https://github.com/apache/iceberg/issues/7598:
Hi, I've been looking at seeing what we can do to make PyArrow Datasets extensible for various table formats and making them consumable to various compute engines (including DuckDB, Polars, DataFusion, Dask). I've written up my observations here: https://docs.google.com/document/d/1r56nt5Un2E7yPrZO9YPknBN4EDtptpx-tqOZReHvq1U/edit?usp=sharing
What this means for PyIceberg's API
Currently, integration with engines like DuckDB means filters and projections have to be specified up front, rather than pushed down from the query:
con = table.scan(
row_filter=GreaterThanOrEqual("trip_distance", 10.0),
selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"),
).to_duckdb(table_name="distant_taxi_trips")
Ideally, we can export the table as a dataset, register it in DuckDB (or some other engine), and then filters and projections can be pushed down as the engine sees fit. Then the following would perform equivalent to the above, but would be more user friendly:
dataset = table.to_pyarrow_dataset()
con.register(dataset, "distant_taxi_trips")
conn.sql(""""SELECT VendorID, tpep_pickup_datetime, tpep_dropoff_datetime
FROM distant_taxi_trips
WHERE trip_distance > 10.0""")
Query engine
Other
Hi @Fokko , I can have a look into the issue!
Hi, is there any update on this topic? Thanks.
Yo, just chiming in that we would love this for dbt-duckdb use cases-- thanks!
(If this is a thing I can add, please lmk- I can be surprisingly useful)
Hey @jwills I think many folks are looking forward to this, so it would be great if you would be willing to spend time on getting this in 🙌
sg @Fokko, will dive in here
Awesome, let me know if there are any questions. Happy to provide context
@Fokko okay I'm read in here; is the best approach atm something like this comment from the original issue?
I looked at the code in PyIceberg again and I remembered an idea I had that I never tested. Right now, the implementation eagerly loads a table for every file-level projection and concats them. Would it be possible instead to create a pyarrow dataset for every file and return a union dataset that combines them? I've never touched these lower level features of PyArrow datasets before so this idea is all based on hazy recollection of source code reading from long ago.
If this is something PyArrow supports today (unioning datasets with different projection plans that produce the same final schema, without materializing a table), then it could be the easiest way to achieve the "pyiceberg returns a dataset that is compatible with iceberg schema evolution", at least for copy-on-write workloads.
Are there any dragons here or downsides to attempting this that would make it not worthwhile to attempt?
Maybe this helps.
A PyArrow Dataset can be initiated from a list of file paths:
Create a FileSystemDataset from explicitly given files. The files must be located on the same filesystem given by the filesystem parameter. Note that in contrary of construction from a single file, passing URIs as paths is not allowed.
More info here.
To make it work, the corresponding cloud filesystem, e.g. pyarrow.fs.S3FileSystem has to be specified, see here.
Hey @jwills Having a union dataset feels like a step in the right direction to me, however I don't think it will really help when it comes to performance.
Loading the files through PyArrow is very slow at the moment. The biggest issue there is that we aren't able to do the schema evolution in pure Arrow. That's why we materialize to a table, do all the changes needed to the schema, and then we concat all the tables in the end. This is very costly to do in Python. The main issue here is that Arrow does not support fetching schema's/filtering through field-ids which is the basis of Iceberg.
A cleaner option would be to have the arrow dataset expose a protocol that we can implement. This was suggested a while ago, but they we're very reluctant on this and wanted to do everything through substrait.
@Fokko agreed, the union approach seems like a perf killer. Will noodle on this a bit more-- thanks for the context here!
Just for context, don't know if it helps. I was recently playing by pushing the union of the tables into Arrow, including all the schema evolution. This would prevent PyIceberg from doing this itself which is slow. The idea was to create an empty table with the requested schema. And then union all the parquet files to it. With the new option in concat table to automatically do schema evolution. The missing part there is that Arrow cannot re-order struct fields :(
That is helpful, thank you.
One other option I was considering on my side, given that I have access to https://github.com/duckdb/duckdb_iceberg : Using pyiceberg to fetch the metadata for an Iceberg table (like path and which snapshot to read) but then delegating the actual reading to the Iceberg scan operation built-in to DuckDB (which looks to me like it bypasses the arrow issues entirely.)
Do you have thoughts on that approach, or is it outside of your wheelhouse?
I'm always in for in for creative solutions. I think that would well, also my colleague did something similar: https://gist.github.com/kainoa21/f3d01c607fce2741cef22683048a22a3 which is a really nifty trick!
Hi team, do we have an update on this? We are really excited with this feature.
Just to note, we would also love this feature. It would allow us to support Iceberg read/write in Ibis.
A PyArrow Dataset can be initiated from a list of file paths:
Create a FileSystemDataset from explicitly given files. The files must be located on the same filesystem given by the filesystem parameter. Note that in contrary of construction from a single file, passing URIs as paths is not allowed.
I have the opposite idea in mind: a Pyarrow representation / dataset protocol should be something like a MemoryBuffer that offers a set of APIs available to every table implementation.
Then other query engine can then load the dataset via this "InMemoryDataset" (thats kind of my mental model for Arrow) as intermediary (not sure we want to expose this directly like this, could be something thats hidden and low level).
Would love this feature, I am coming from https://github.com/ibis-project/ibis/issues/7712 as well.
Reference to delta table's to_pyarrow_dataset implementation
https://github.com/delta-io/delta-rs/blob/3f355d87119661fc7cf28877b620b589277ba1d1/python/deltalake/table.py#L1108-L1209
@kevinjqliu alas it's not as simple for iceberg because of the need to do field id-based projection to handle schema evolution.
Somewhat relatedly: from what I remember, and assuming nothing has changed, PyArrow Datasets can't have fragments with different input schema. PyArrow Dataset implementation assumes a dataset has a single schema and that it matches the incoming schema of the fragments, so e.g. you can't specify per-fragment projections.
I know @kevinjqliu understands this already, but for others who may want to pick up work on this in the future, here's what this means in practice: Say you're reading an iceberg table snapshot, and the schema of that snapshot should be the schema of the PyArrow dataset (the "output schema" of the dataset scan). Since schema changes in Iceberg are metadata-only operations, it's possible that your table has e.g. historical partitions written with an older schema that have not been re-written. An Iceberg-compliant reader is supposed to project these old files to the new schema at read time. That means, in PyArrow Dataset terminology, you need fragments with potentially different schemas to be projected to the final schema of the dataset. And the set of things this has to account for includes e.g. column renames (would be handled by field id-based projection if pyarrow supported it), but also e.g. type widening (so e.g., old fragments have column foo stored as an int, new fragments store foo as long after a schema update; pyarrow dataset needs to cast foo to long when reading the old fragments).
There was a discussion long ago about creating a PyArrow Dataset Python protocol. The most recent discussion that I am aware of proposes that third parties could implement scanners with their own logic around an interface that accepts projections and filters expressed as substrait predicates, and returns one or more streams of arrow data. Libraries that consume PyArrow datasets would be able to consume these other implementations using the PyCapsule protocol without needing any special integration logic.
The PyArrow dataset protocol hasn't been adopted, but it would indeed allow pyiceberg to define a PyArrow Dataset-compatible scanner that does whatever it wants. However, unless pyiceberg lifted much of the underlying implementation into iceberg-rust, a pure-python dataset implementation might not be able to achieve the performance folks want. In a world where pyiceberg continues to rely mostly on pyarrow for native C++ implementations of performance-critical logic, pyiceberg would probably benefit most from extensions to the dataset implementation in PyArrow allowing folks to specify per-fragment projections to the final dataset output schema. That way all the actual concurrent scanning logic could be farmed out to pyarrow's native implementations.
So... there are a few threads folks could pull on to try to make this happen. If the goal is compatibility with the wide variety of execution engines (duckdb, polars, etc) that support pyarrow datasets today, this may still be a path worth pursuing. The alternative is continuing the development of engine-specific integrations but I have not seen a lot of real development for e.g., the duckdb_iceberg extension, which is still very incomplete, and in general that might still result in a lot of duplicative work.
If you don't care about which query engine, but just want at least 1 in-process query engine in python that can read iceberg great performance, I think following up with the effort to add FFI for TableProviders in datafusion and data-fusion python, and then helping out over in iceberg-rust to expose their IcebergTableProvider in Python (and making sure it has any of the features it needs to be performant) might actually be the quickest path forward.