optimize `_combine_positional_deletes`
Apache Iceberg version
None
Please describe the bug 🐞
As part of the effort to remove numpy as a dependency in #1259, we changed _combine_positional_deletes function to use range instead of np.arrange. This causes a performance regression. We choose to move forward for now since it is not very common to have a file affected by multiple positional deletes.
apache/arrow/#44583 is opened to add equivalent functionality in pyarrow which we can then port into pyiceberg.
Hi @kevinjqliu
Can we rewrite _combine_positional_deletes function by using a set-based approach instead of the previous NumPy method. The set method significantly improves performance, particularly when handling large arrays of deleted positions.
https://github.com/apache/iceberg-python/issues/1259#issuecomment-2448393527 possible solution using pyarrow cython/C++ API
@omkenge i've tried a set-based approach but didn't see any performance improvements. I used https://github.com/apache/iceberg-python/issues/1259#issuecomment-2446285669 to test
@kevinjqliu I did find a pure-python approach that is faster (~2.4x on my machine) than pyarrow.array(range(...)):
import pyarrow as pa
import ctypes
def create_arrow_range(start: int, end: int) -> pa.Array:
if start >= end:
raise ValueError("start must be less than end")
length = end - start
buf = pa.allocate_buffer(length * 8, resizable=False)
ptr: ctypes.Array = (ctypes.c_int64 * length).from_buffer(buf)
for i in range(length):
ptr[i] = start + i
array = pa.Array.from_buffers(pa.int64(), length, [None, buf])
return array
@corleyma thats awesome, thanks! Would you like to open a PR and contribute the change?
Gentle ping @corleyma :)
They added support for the arange function in pyarrow in https://github.com/apache/arrow/pull/46778. I pulled the latest Arrow branch, built it locally, and integrated it into PyIceberg. The updated implementation would looks like this:
def _combine_positional_deletes(positional_deletes: List[pa.ChunkedArray], start_index: int, end_index: int) -> pa.Array:
if len(positional_deletes) == 1:
all_chunks = positional_deletes[0]
else:
all_chunks = pa.chunked_array(itertools.chain(*[arr.chunks for arr in positional_deletes]))
full_range = pa.arange(start_index, end_index)
result = pc.filter(full_range, pc.invert(pc.is_in(full_range, value_set=all_chunks)))
return pc.subtract(result, pa.scalar(start_index))
Using the benchmark from @corleyma, I get roughly:
Testing range from 0 to 10000000 (10000000 elements)
Python average: 0.6944s
Pyarrow arrange average: 0.0096s
Speedup: 71.96x
Maybe in the next arrow release we can add this if an upgrade isn't to burdensome.
Cool! Looks like this will be part of pyarrow 21
I'd love to get some thoughts around how we can better support multiple pyarrow versions in pyiceberg, https://github.com/apache/iceberg-python/issues/2209