Upserting large table extremely slow
Feature Request / Improvement
Feature Request / Improvement
Upserting large dataframes (tens of millions of rows) in un-usably slow due to creating a massive BooleanExpression in upsert_util.create_match_filter. This is before any IO is even started. It would be nice if upserts could support large tables.
I'd be happy to work on this issue.
Haha I was just looking at this last week, it is especially slow for time series data with tens of millions of rows (that is a huuuuuge filter) -- I wonder if it would make sense for a user to be able to supply their own filter into the util? That was what enabled me to squeeze more performance out after hard coding it in
Being able to provide a "hint" seems like a decent workaround, but then we have to rely on the user providing the correct filter, otherwise the upsert won't work properly.
Ultimately I think upsert should be able to make use of the table's PartitionSpec to determine what files to read given the table to be upserted. (This would only work when the row identifiers are also being used to partition but I believe that is a pretty natural use pattern).
This aligns well with the discussion here: https://github.com/apache/iceberg-python/issues/2138#issuecomment-2997190853
While there have been improvements to upsert - like reducing memory pressure and avoiding recursion in create_match_filter - performance is still suboptimal. This is largely because upsert is currently built on top of operations like delete and overwrite.
For example, overwrite internally performs a delete followed by an append. The delete step may even rewrite entire data files by applying a filter and preserving rows that don’t match, then rewriting the result using the current schema, partition spec, and sort order.
Interestingly, this "replace" behavior in delete is already very close to what an upsert needs to do - especially when new rows are appended immediately after. Here's a simplified view of how files are rewritten:
df = ArrowScan(...).to_table(...)
filtered_df = df.filter(preserve_row_filter)
if len(filtered_df) == 0:
replaced_files.append((original_file.file, []))
elif len(df) != len(filtered_df):
replaced_files.append((original_file.file, [...]))
Because upsert reuses these higher-level constructs, we lose the opportunity to optimize the operation at a lower level. Treating upsert as a first-class primitive, like delete or overwrite, would allow us to optimize each step more precisely and avoid unnecessary rewrites, which is especially important for large tables.
cc @Fokko
Hey @koenvo thanks for raising this discussion. Nothing is set in stone, so there are always possibilities to optimize, and I agree, we started with rough building blocks.
The nice thing of the current approach is, when there is nothing to delete, it will only create an append operation. Also creating two snapshots (DELETE+APPEND) instead of just a OVERWRITE, makes it more transparent to other clients to what happend to the table. Although this should not be at every expense. Currently, we actually produce:
DELETE,APPENDfor the delete operationAPPENDfor the insert operation
Also, potentially generating quite a bit of data and metadata, so there is definitely room for improvement.
Here's a simplified view of how files are rewritten:
I don't think that's correct, because there we pull in all the data in memory, but we filter on only relevant files based on the Iceberg metadata.
I think @Anton-Tarazi's original point -- creating a bunch of (Python object) filter expressions for every row in a large dataframe is going to be slow, and we do that before any real i/o happens -- this is done so that, as Fokko says,
we filter on only relevant files based on the Iceberg metadata.
My guess is for tables of a certain size/layout you'd do better to just skip this metadata-based file pruning given the expensive python object creations.
Honestly, I think it would be a better use of community resources to invest more in the iceberg-rust/datafusion path so that the bulk of this logic can be moved out of pure python, than to spend a ton of effort trying to optimize the Python implementation.
Honestly, I think it would be a better use of community resources to invest more in the iceberg-rust/datafusion path so that the bulk of this logic can be moved out of pure python, than to spend a ton of effort trying to optimize the Python implementation.
Agreed, I think if we spent more effort rolling wheels from the rust version and targeting a tighter relationship between pyiceberg and the rust implementation everyone wins
Totally agree. Lets start exploring the iceberg-rust codebase
@Fokko @kevinjqliu do you think its worth setting up a roadmap for what should be candidates for rolling wheels from rust? Would really help focus efforts on lacking parts of rust if we want parity!
I've created https://github.com/apache/iceberg-python/issues/2396 as I think that would already speed up things quite a bit.
Posted this on Slack as well but during my initial test with one month of data.
I read Parquet source files in batches of 128 MB (based on file size) and used .upsert() to write to iceberg. I tried with the table identifier as a single column (xxhash 128) and multiple columns (the actual business columns).
initial upsert (on empty table) 962,000 rows in 75 seconds batch 2 upsert 967,000 rows in 256 seconds batch 3 upsert 728,000 rows in 328 seconds
Then I tried to process 1 day of data and I ran into an OOM error (with delta-rs this same test took 3 seconds to completely successfully).
Each batch took longer than the previous batch and everything took way too long. I am going to test if I can use partial overwrites to build a more specific predicate/expression.