iceberg-python
iceberg-python copied to clipboard
Merge into / Upsert
Feature Request / Improvement
Have an API to efficiently perform an upsert
To work well with some of the larger data usecases where folks are using PySpark today, I think this would need to play well with pyarrow streaming read/write functionality, so that one could do atomic upsert of batches without having to read all the data into memory at once.
I call this out because current write functionality works with pyarrow Tables, which are fully materialized in memory. Working with larger data might include making the pyiceberg write APIs work with Iterator[RecordBatch]
and friends (as returned by pyarrow Datasets/Scanner) in addition to pyarrow Tables.
Hi @corleyma - I opened up this PR to address your comment here by introducing a scan API that will return a RecordBatchReader. It's pending some resolutions with related issues, but it's almost complete. Would appreciate your feedback if you are interested in using this API 🙂
Hello, thanks for all the great work!
Now that version 0.7.0
is released, I was wondering where can I find some documentation covering how to write to a partitioned table.
Thanks a lot!
Hello, thanks for all the great work!
Now that version
0.7.0
is released, I was wondering where can I find some documentation covering how to write to a partitioned table.Thanks a lot!
Hi @Milias you can create a table with a partition by following the documentation here on Creating a table.
I realize we could have had an explicit section on creating and writing to a partitioned table under Write to a Table section. Currently, we support partitioned writes for IdentityTransform and TimeTransform (Year, Month, Day, Hour) partitions. Please let us know if that works for you!
Sung
Hey @sungwy, thanks for the quick answer!
I was already making use of writing support and indeed had seen that section of the documentation. Right now I prepared a very quick test of writing to a table partitioned with IdentityTransform
. Then, .append
and .overwrite
work as expected, that is, they either append new data to the appropriate partitions and replace the whole table, respectively.
After this I'm left wondering how can individual partitions be replaced. Maybe this functionality is not yet supported, with writes to partitioned table being only the first step. To give an example of what I mean, taking the table from the example in the documentation:
import pyarrow as pa
df = pa.Table.from_pylist(
[
{"city": "Amsterdam", "lat": 52.371807, "long": 4.896029, "index": 1},
{"city": "San Francisco", "lat": 37.773972, "long": -122.431297, "index": 1},
{"city": "Drachten", "lat": 53.11254, "long": 6.0989, "index": 2},
{"city": "Paris", "lat": 48.864716, "long": 2.349014, "index": 2},
],
schema=schema_to_pyarrow(tbl.schema())
)
tbl.overwrite(df)
With table:
schema = Schema(
NestedField(1, "city", StringType(), required=False),
NestedField(2, "lat", DoubleType(), required=False),
NestedField(3, "long", DoubleType(), required=False),
NestedField(4, "index", IntegerType(), required=True),
)
partition_spec = PartitionSpec(
PartitionField(source_id=4, field_id=1000, transform=IdentityTransform(), name="index_partition"),
)
tbl = catalog.create_table("public.cities", schema=schema, partition_spec=partition_spec)
Then, if we add a few more rows:
df2 = pa.Table.from_pylist(
[
{"city": "Amsterdam", "lat": 52.371807, "long": 4.896029, "index": 1},
{"city": "Null Island", "lat": 0.0, "long": 0.0, "index": 3},
],
schema=schema_to_pyarrow(tbl.schema())
)
Then, when doing tbl.overwrite(df2)
I would like to have some way of indicating that partition with index = 2
should be left as-is.
It is very possible that I misunderstood the precise scope of write support to partitioned tables, since this issue #402 is still open. But in case that it is already possible to overwrite specific partitions, that's the piece of information I was searching for.
Thanks a lot again :smiley:
I think you must be referring to dynamic overwrite / replace partition API that detects the partitions of the given input and replaces it. This feature is actually still in progress on this PR: https://github.com/apache/iceberg-python/pull/931
That PR looks exactly like what I am asking for, yes! Thank you very much for pointing it out. I will keep an eye on it.
Any update on if there is an API for merge into / upsert?
Hi @ev2900 - would using the overwrite
feature by specifying the boolean expression on which to upsert work for your use case?
https://github.com/apache/iceberg-python/blob/main/pyiceberg/table/init.py#L479-L483
I realize we don't have an example of invoking an overwrite
without the overwrite_filter
specified. I'll raise an issue to track adding this explicitly into our API documentation. https://github.com/apache/iceberg-python/issues/1008
Let me take a look at this. It would be very helpful if there was an example
@ev2900 agreed :) I've added that Issue above (#1008) to address that
Any updates on this one? I'm good with overwrite + overwrite filters for now but for tables where columns are populated by different sources it would be awesome to have full MERGE INTO
support and to be able to select which columns to update
Hi @Minfante377 sorry for the delayed response, and thank you for the interest!
Unfortunately, this is still an open issue on PyIceberg with no assignee. MERGE INTO with the column matching semantics like:
MERGE INTO table t using (SELECT ...) s ON t.id = s.id
is unfortunately a bit complicated to support efficiently, so I've been trying to make time to look at it in depth. And unfortunately I haven't had the time for this specific issue.
Would you be interested in making a contribution?
@sungwy sorry I've been really busy lately. Of course. I'll start taking a look at the code base and see where I can start to try to accomplish this. Thank you!