iceberg-python icon indicating copy to clipboard operation
iceberg-python copied to clipboard

Move implementation of upsert from Table to Transaction

Open koenvo opened this issue 9 months ago • 6 comments

Rationale for this change

Previously, the upsert functionality was implemented at the table level, which meant it always initiated a new Transaction. This change moves the upsert implementation to the Transaction level while keeping table.upsert(...) as an entry point.

With this refactor, end users now have the flexibility to call upsert in two ways:

  • table.upsert(...) – which still starts a new transaction.
  • transaction.upsert(...) – allowing upserts within an existing transaction.

Are these changes tested?

Using existing tests.

Are there any user-facing changes?

Yes. This change enables users to perform upserts within an existing transaction using transaction.upsert(...), in addition to the existing table.upsert(...) method.

koenvo avatar Mar 19 '25 12:03 koenvo

I think since the transaction wrapper has been moved out, there should be a unit test added to do partial upsert and then throw an error and ensure the rollback occurs and we are not left in a state where a partial upsert succeeded.

Example:

  • start an upsert
  • let the update succeed
  • force an error on the insert component
  • rollback the transaction and make sure the update did not persist

Just my thoughts 😃. Thanks, Matt

mattmartin14 avatar Mar 22 '25 14:03 mattmartin14

Agree! I will work on the test.

With "update" you mean "delete", right?

koenvo avatar Mar 22 '25 14:03 koenvo

Agree! I will work on the test.

With "update" you mean "delete", right?

Hey sorry; just saw this; when i mean update, i mean it invokes an "overwrite" operation, which i believe is what delete's also trigger under the covers. 😀

mattmartin14 avatar Mar 26 '25 16:03 mattmartin14

There is a nice edgecase here..

tbl = catalog.create_table(identifier, schema=schema)

# Define exact schema: required int32 and required string
arrow_schema = pa.schema([
    pa.field("id", pa.int32(), nullable=False),
    pa.field("name", pa.string(), nullable=False),
])

tbl.append(pa.Table.from_pylist([{"id": 1, "name": "Alice"}], schema=arrow_schema))

df = pa.Table.from_pylist([{"id": 2, "name": "Bob"}, {"id": 1, "name": "Alicia"}], schema=arrow_schema)

with tbl.transaction() as txn:
    txn.upsert(df, join_cols=["id"])

    # This will re-insert Bob, instead of reading the uncommitted changes and ignore Bob
    txn.upsert(df, join_cols=["id"])

@Fokko should it be possible to read uncommitted changes?

koenvo avatar Mar 27 '25 20:03 koenvo

@Fokko should it be possible to read uncommitted changes?

Yes, it should. If you do a subsequent upsert with the same data, it should be a no-op. This should be the case today, otherwise, https://github.com/apache/iceberg-python/pull/1903 will fix this.

Fokko avatar Apr 17 '25 19:04 Fokko

now that #1903 is merged, could you rebase this PR?

kevinjqliu avatar Apr 19 '25 18:04 kevinjqliu