iceberg-python icon indicating copy to clipboard operation
iceberg-python copied to clipboard

Transactions do not support Upsert

Open suarez-agu opened this issue 9 months ago • 2 comments

Feature Request / Improvement

Given the issue #1759

I tried instead writing smaller amounts of rows but also wanted a guarantee that all rows would be upserted.

Tried wrapping upsert within a Transaction but noticed Transaction does not support upserts

Using 0.9.0 release

suarez-agu avatar Mar 07 '25 17:03 suarez-agu

Thanks @suarez-agu for raising this. The transaction should also support upsert. Are you interested in contributing this?

Fokko avatar Mar 10 '25 20:03 Fokko

I would like to take a look at this one.

koenvo avatar Mar 11 '25 15:03 koenvo

@koenvo how's it looking? I'm also interested in this feature.

potatochipcoconut avatar May 13 '25 04:05 potatochipcoconut

Thanks for the reminder. Let me finish this

koenvo avatar May 13 '25 04:05 koenvo

thanks @koenvo looking forward to it!

potatochipcoconut avatar May 13 '25 15:05 potatochipcoconut

@Fokko just curious, any eta on when this would land with next release?

potatochipcoconut avatar May 14 '25 21:05 potatochipcoconut

@koenvo just tried this out but wondering if it'd be possible to have upsert available while updating snapshot?

I'm hoping to implement upsert() for a workflow that uses the distributed write pattern as described here

Where the data is written into the table like such:

        with table.transaction() as trx:
            with trx.update_snapshot().fast_append() as update_snapshot:
                update_snapshot.append_data_file(data_file)

Thanks!

potatochipcoconut avatar May 14 '25 22:05 potatochipcoconut

The pattern you describing only does appends, right? In that case it’s easier to have distributed workers and a central worker doing the commits.

With an upsert you probably also have to delete data. When a delete is involved it’s harder as upserts get dependent on each other.

@Fokko what do you think?

koenvo avatar May 15 '25 04:05 koenvo

I see, hmm, dang yeah I have a concern because I'm using a FIFO queue to process the data, but those have a limitation of 10 items per batch, so I want to try a regular queue with higher throughput, but that introduces possibility of duplicate events being processed (exactly-once vs at-least-once delivery). My thought was that using upsert would make it safe and wouldn't end up with duplicate data.

potatochipcoconut avatar May 15 '25 15:05 potatochipcoconut

FYI tried this out in distributed write scenario where data files are serialized, then deserialized as described in link above, this is result when trying to do txn.upsert()

[ERROR] AttributeError: 'DataFile' object has no attribute 'select'
Traceback (most recent call last):
  File "/var/task/write/lambda_function.py", line 26, in handler
    write_to_iceberg(data_files)
  File "/var/task/ocr_experiments/utils.py", line 317, in write_to_iceberg
    write_data(table, data)
  File "/var/task/ocr_experiments/utils.py", line 329, in write_data
    txn.upsert(data_file, join_cols=['id'])
  File "/var/lang/lib/python3.12/site-packages/pyiceberg/table/__init__.py", line 763, in upsert
    if upsert_util.has_duplicate_rows(df, join_cols):
  File "/var/lang/lib/python3.12/site-packages/pyiceberg/table/upsert_util.py", line 53, in has_duplicate_rows
    return len(df.select(join_cols).group_by(join_cols).aggregate([([], "count_all")]).filter(pc.field("count_all") > 1)) > 0

potatochipcoconut avatar May 20 '25 18:05 potatochipcoconut