iceberg-python icon indicating copy to clipboard operation
iceberg-python copied to clipboard

Merge into / Upsert

Open Fokko opened this issue 1 year ago • 13 comments

Feature Request / Improvement

Have an API to efficiently perform an upsert

Fokko avatar Feb 09 '24 18:02 Fokko

To work well with some of the larger data usecases where folks are using PySpark today, I think this would need to play well with pyarrow streaming read/write functionality, so that one could do atomic upsert of batches without having to read all the data into memory at once.

I call this out because current write functionality works with pyarrow Tables, which are fully materialized in memory. Working with larger data might include making the pyiceberg write APIs work with Iterator[RecordBatch] and friends (as returned by pyarrow Datasets/Scanner) in addition to pyarrow Tables.

corleyma avatar Mar 15 '24 22:03 corleyma

Hi @corleyma - I opened up this PR to address your comment here by introducing a scan API that will return a RecordBatchReader. It's pending some resolutions with related issues, but it's almost complete. Would appreciate your feedback if you are interested in using this API 🙂

sungwy avatar Jun 12 '24 23:06 sungwy

Hello, thanks for all the great work!

Now that version 0.7.0 is released, I was wondering where can I find some documentation covering how to write to a partitioned table.

Thanks a lot!

Milias avatar Jul 31 '24 22:07 Milias

Hello, thanks for all the great work!

Now that version 0.7.0 is released, I was wondering where can I find some documentation covering how to write to a partitioned table.

Thanks a lot!

Hi @Milias you can create a table with a partition by following the documentation here on Creating a table.

I realize we could have had an explicit section on creating and writing to a partitioned table under Write to a Table section. Currently, we support partitioned writes for IdentityTransform and TimeTransform (Year, Month, Day, Hour) partitions. Please let us know if that works for you!

Sung

sungwy avatar Jul 31 '24 22:07 sungwy

Hey @sungwy, thanks for the quick answer!

I was already making use of writing support and indeed had seen that section of the documentation. Right now I prepared a very quick test of writing to a table partitioned with IdentityTransform. Then, .append and .overwrite work as expected, that is, they either append new data to the appropriate partitions and replace the whole table, respectively.

After this I'm left wondering how can individual partitions be replaced. Maybe this functionality is not yet supported, with writes to partitioned table being only the first step. To give an example of what I mean, taking the table from the example in the documentation:

import pyarrow as pa

df = pa.Table.from_pylist(
    [
        {"city": "Amsterdam", "lat": 52.371807, "long": 4.896029, "index": 1},
        {"city": "San Francisco", "lat": 37.773972, "long": -122.431297, "index": 1},
        {"city": "Drachten", "lat": 53.11254, "long": 6.0989, "index": 2},
        {"city": "Paris", "lat": 48.864716, "long": 2.349014, "index": 2},
    ],
    schema=schema_to_pyarrow(tbl.schema())
)

tbl.overwrite(df)

With table:

schema = Schema(
    NestedField(1, "city", StringType(), required=False),
    NestedField(2, "lat", DoubleType(), required=False),
    NestedField(3, "long", DoubleType(), required=False),
    NestedField(4, "index", IntegerType(), required=True),
)

partition_spec = PartitionSpec(   
    PartitionField(source_id=4, field_id=1000, transform=IdentityTransform(), name="index_partition"),
)

tbl = catalog.create_table("public.cities", schema=schema, partition_spec=partition_spec)

Then, if we add a few more rows:

df2 = pa.Table.from_pylist(
    [
        {"city": "Amsterdam", "lat": 52.371807, "long": 4.896029, "index": 1},
        {"city": "Null Island", "lat": 0.0, "long": 0.0, "index": 3},
    ],
    schema=schema_to_pyarrow(tbl.schema())
)

Then, when doing tbl.overwrite(df2) I would like to have some way of indicating that partition with index = 2 should be left as-is.

It is very possible that I misunderstood the precise scope of write support to partitioned tables, since this issue #402 is still open. But in case that it is already possible to overwrite specific partitions, that's the piece of information I was searching for.

Thanks a lot again :smiley:

Milias avatar Aug 01 '24 09:08 Milias

I think you must be referring to dynamic overwrite / replace partition API that detects the partitions of the given input and replaces it. This feature is actually still in progress on this PR: https://github.com/apache/iceberg-python/pull/931

sungwy avatar Aug 01 '24 10:08 sungwy

That PR looks exactly like what I am asking for, yes! Thank you very much for pointing it out. I will keep an eye on it.

Milias avatar Aug 01 '24 21:08 Milias

Any update on if there is an API for merge into / upsert?

ev2900 avatar Aug 06 '24 14:08 ev2900

Hi @ev2900 - would using the overwrite feature by specifying the boolean expression on which to upsert work for your use case?

https://github.com/apache/iceberg-python/blob/main/pyiceberg/table/init.py#L479-L483

I realize we don't have an example of invoking an overwrite without the overwrite_filter specified. I'll raise an issue to track adding this explicitly into our API documentation. https://github.com/apache/iceberg-python/issues/1008

sungwy avatar Aug 06 '24 14:08 sungwy

Let me take a look at this. It would be very helpful if there was an example

ev2900 avatar Aug 06 '24 15:08 ev2900

@ev2900 agreed :) I've added that Issue above (#1008) to address that

sungwy avatar Aug 06 '24 15:08 sungwy

Any updates on this one? I'm good with overwrite + overwrite filters for now but for tables where columns are populated by different sources it would be awesome to have full MERGE INTO support and to be able to select which columns to update

Minfante377 avatar Aug 31 '24 16:08 Minfante377

Hi @Minfante377 sorry for the delayed response, and thank you for the interest!

Unfortunately, this is still an open issue on PyIceberg with no assignee. MERGE INTO with the column matching semantics like:

MERGE INTO table t using (SELECT ...) s ON t.id = s.id

is unfortunately a bit complicated to support efficiently, so I've been trying to make time to look at it in depth. And unfortunately I haven't had the time for this specific issue.

Would you be interested in making a contribution?

sungwy avatar Sep 24 '24 17:09 sungwy

@sungwy sorry I've been really busy lately. Of course. I'll start taking a look at the code base and see where I can start to try to accomplish this. Thank you!

Minfante377 avatar Oct 06 '24 17:10 Minfante377