iceberg-python icon indicating copy to clipboard operation
iceberg-python copied to clipboard

Upserting large table extremely slow

Open Anton-Tarazi opened this issue 5 months ago • 10 comments

Feature Request / Improvement

Feature Request / Improvement

Upserting large dataframes (tens of millions of rows) in un-usably slow due to creating a massive BooleanExpression in upsert_util.create_match_filter. This is before any IO is even started. It would be nice if upserts could support large tables.

I'd be happy to work on this issue.

Anton-Tarazi avatar Jun 28 '25 23:06 Anton-Tarazi

Haha I was just looking at this last week, it is especially slow for time series data with tens of millions of rows (that is a huuuuuge filter) -- I wonder if it would make sense for a user to be able to supply their own filter into the util? That was what enabled me to squeeze more performance out after hard coding it in

jayceslesar avatar Jun 28 '25 23:06 jayceslesar

Being able to provide a "hint" seems like a decent workaround, but then we have to rely on the user providing the correct filter, otherwise the upsert won't work properly.

Ultimately I think upsert should be able to make use of the table's PartitionSpec to determine what files to read given the table to be upserted. (This would only work when the row identifiers are also being used to partition but I believe that is a pretty natural use pattern).

Anton-Tarazi avatar Jun 28 '25 23:06 Anton-Tarazi

This aligns well with the discussion here: https://github.com/apache/iceberg-python/issues/2138#issuecomment-2997190853

While there have been improvements to upsert - like reducing memory pressure and avoiding recursion in create_match_filter - performance is still suboptimal. This is largely because upsert is currently built on top of operations like delete and overwrite.

For example, overwrite internally performs a delete followed by an append. The delete step may even rewrite entire data files by applying a filter and preserving rows that don’t match, then rewriting the result using the current schema, partition spec, and sort order.

Interestingly, this "replace" behavior in delete is already very close to what an upsert needs to do - especially when new rows are appended immediately after. Here's a simplified view of how files are rewritten:

df = ArrowScan(...).to_table(...)
filtered_df = df.filter(preserve_row_filter)

if len(filtered_df) == 0:
    replaced_files.append((original_file.file, []))
elif len(df) != len(filtered_df):
    replaced_files.append((original_file.file, [...]))

Because upsert reuses these higher-level constructs, we lose the opportunity to optimize the operation at a lower level. Treating upsert as a first-class primitive, like delete or overwrite, would allow us to optimize each step more precisely and avoid unnecessary rewrites, which is especially important for large tables.

cc @Fokko

koenvo avatar Jul 02 '25 08:07 koenvo

Hey @koenvo thanks for raising this discussion. Nothing is set in stone, so there are always possibilities to optimize, and I agree, we started with rough building blocks.

The nice thing of the current approach is, when there is nothing to delete, it will only create an append operation. Also creating two snapshots (DELETE+APPEND) instead of just a OVERWRITE, makes it more transparent to other clients to what happend to the table. Although this should not be at every expense. Currently, we actually produce:

  • DELETE, APPEND for the delete operation
  • APPEND for the insert operation

Also, potentially generating quite a bit of data and metadata, so there is definitely room for improvement.

Here's a simplified view of how files are rewritten:

I don't think that's correct, because there we pull in all the data in memory, but we filter on only relevant files based on the Iceberg metadata.

Fokko avatar Jul 03 '25 07:07 Fokko

I think @Anton-Tarazi's original point -- creating a bunch of (Python object) filter expressions for every row in a large dataframe is going to be slow, and we do that before any real i/o happens -- this is done so that, as Fokko says,

we filter on only relevant files based on the Iceberg metadata.

My guess is for tables of a certain size/layout you'd do better to just skip this metadata-based file pruning given the expensive python object creations.

Honestly, I think it would be a better use of community resources to invest more in the iceberg-rust/datafusion path so that the bulk of this logic can be moved out of pure python, than to spend a ton of effort trying to optimize the Python implementation.

corleyma avatar Jul 03 '25 17:07 corleyma

Honestly, I think it would be a better use of community resources to invest more in the iceberg-rust/datafusion path so that the bulk of this logic can be moved out of pure python, than to spend a ton of effort trying to optimize the Python implementation.

Agreed, I think if we spent more effort rolling wheels from the rust version and targeting a tighter relationship between pyiceberg and the rust implementation everyone wins

jayceslesar avatar Jul 03 '25 18:07 jayceslesar

Totally agree. Lets start exploring the iceberg-rust codebase

koenvo avatar Jul 03 '25 19:07 koenvo

@Fokko @kevinjqliu do you think its worth setting up a roadmap for what should be candidates for rolling wheels from rust? Would really help focus efforts on lacking parts of rust if we want parity!

jayceslesar avatar Jul 03 '25 19:07 jayceslesar

I've created https://github.com/apache/iceberg-python/issues/2396 as I think that would already speed up things quite a bit.

Fokko avatar Aug 28 '25 08:08 Fokko

Posted this on Slack as well but during my initial test with one month of data.

I read Parquet source files in batches of 128 MB (based on file size) and used .upsert() to write to iceberg. I tried with the table identifier as a single column (xxhash 128) and multiple columns (the actual business columns).

initial upsert (on empty table) 962,000 rows in 75 seconds batch 2 upsert 967,000 rows in 256 seconds batch 3 upsert 728,000 rows in 328 seconds

Then I tried to process 1 day of data and I ran into an OOM error (with delta-rs this same test took 3 seconds to completely successfully).

Each batch took longer than the previous batch and everything took way too long. I am going to test if I can use partial overwrites to build a more specific predicate/expression.

ldacey avatar Oct 01 '25 15:10 ldacey