Transactions do not support Upsert
Feature Request / Improvement
Given the issue #1759
I tried instead writing smaller amounts of rows but also wanted a guarantee that all rows would be upserted.
Tried wrapping upsert within a Transaction but noticed Transaction does not support upserts
Using 0.9.0 release
Thanks @suarez-agu for raising this. The transaction should also support upsert. Are you interested in contributing this?
I would like to take a look at this one.
@koenvo how's it looking? I'm also interested in this feature.
Thanks for the reminder. Let me finish this
thanks @koenvo looking forward to it!
@Fokko just curious, any eta on when this would land with next release?
@koenvo just tried this out but wondering if it'd be possible to have upsert available while updating snapshot?
I'm hoping to implement upsert() for a workflow that uses the distributed write pattern as described here
Where the data is written into the table like such:
with table.transaction() as trx:
with trx.update_snapshot().fast_append() as update_snapshot:
update_snapshot.append_data_file(data_file)
Thanks!
The pattern you describing only does appends, right? In that case it’s easier to have distributed workers and a central worker doing the commits.
With an upsert you probably also have to delete data. When a delete is involved it’s harder as upserts get dependent on each other.
@Fokko what do you think?
I see, hmm, dang yeah I have a concern because I'm using a FIFO queue to process the data, but those have a limitation of 10 items per batch, so I want to try a regular queue with higher throughput, but that introduces possibility of duplicate events being processed (exactly-once vs at-least-once delivery). My thought was that using upsert would make it safe and wouldn't end up with duplicate data.
FYI tried this out in distributed write scenario where data files are serialized, then deserialized as described in link above, this is result when trying to do txn.upsert()
[ERROR] AttributeError: 'DataFile' object has no attribute 'select'
Traceback (most recent call last):
File "/var/task/write/lambda_function.py", line 26, in handler
write_to_iceberg(data_files)
File "/var/task/ocr_experiments/utils.py", line 317, in write_to_iceberg
write_data(table, data)
File "/var/task/ocr_experiments/utils.py", line 329, in write_data
txn.upsert(data_file, join_cols=['id'])
File "/var/lang/lib/python3.12/site-packages/pyiceberg/table/__init__.py", line 763, in upsert
if upsert_util.has_duplicate_rows(df, join_cols):
File "/var/lang/lib/python3.12/site-packages/pyiceberg/table/upsert_util.py", line 53, in has_duplicate_rows
return len(df.select(join_cols).group_by(join_cols).aggregate([([], "count_all")]).filter(pc.field("count_all") > 1)) > 0