[C++] Take kernel can't handle ChunkedArrays that don't fit in an Array
Take() currently concatenates ChunkedArrays first. However, this breaks down when calling Take() from a ChunkedArray or Table where concatenating the arrays would result in an array that's too large. While inconvenient to implement, it would be useful if this case were handled.
This could be done as a higher-level wrapper around Take(), perhaps.
Example in Python:
>>> import pyarrow as pa
>>> pa.__version__
'1.0.0'
>>> rb1 = pa.RecordBatch.from_arrays([["a" * 2**30]], names=["a"])
>>> rb2 = pa.RecordBatch.from_arrays([["b" * 2**30]], names=["a"])
>>> table = pa.Table.from_batches([rb1, rb2], schema=rb1.schema)
>>> table.take([1, 0])
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "pyarrow/table.pxi", line 1145, in pyarrow.lib.Table.take
File "/home/lidavidm/Code/twosigma/arrow/venv/lib/python3.8/site-packages/pyarrow/compute.py", line 268, in take
return call_function('take', [data, indices], options)
File "pyarrow/_compute.pyx", line 298, in pyarrow._compute.call_function
File "pyarrow/_compute.pyx", line 192, in pyarrow._compute.Function.call
File "pyarrow/error.pxi", line 122, in pyarrow.lib.pyarrow_internal_check_status
File "pyarrow/error.pxi", line 84, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: offset overflow while concatenating arrays
In this example, it would be useful if Take() or a higher-level wrapper could generate multiple record batches as output.
Reporter: Will Jones / @wjones127 Assignee: Will Jones / @wjones127
Related issues:
- https://github.com/apache/arrow/issues/28385 (is a child of)
- https://github.com/apache/arrow/issues/26738 (duplicate)
- https://github.com/apache/arrow/issues/31249 (duplicate)
- https://github.com/apache/arrow/issues/26738 (duplicate)
- https://github.com/apache/arrow/issues/34583 (duplicate)
- https://github.com/apache/arrow/issues/37766 (relates to)
- https://github.com/apache/arrow/issues/23539 (relates to)
- https://github.com/apache/arrow/issues/33049 (relates to)
- https://github.com/apache/arrow/issues/40207 (relates to)
PRs and other links:
- https://github.com/apache/arrow/pull/13857
Note: This issue was originally created as ARROW-9773. Please see the migration documentation for further details.
Antoine Pitrou / @pitrou: Note that this can happen with regular arrays too:
>>> import pyarrow as pa
>>> arr = pa.array(["x" * (1<<20)])
>>> t = arr.take(pa.array([0]*((1<<12) + 1), type=pa.int8()))
>>> t.validate(full=True)
Traceback (most recent call last):
[...]
ArrowInvalid: Offset invariant failure: non-monotonic offset at slot 2048: -2147483648 < 2146435072
Leonard Lausen / @leezu:
There is a similar issue with large tables (many rows) of medium size lists (~512 elements per list). When using pa.list_ type, take will fail due to offset overflow while concatenating arrays. Using pa.large_list works. (But in practice it doesn't help as .take performs 3 orders of magnitude (~1s vs ~1ms) slower than indexing operations on pandas..)
Chris Fregly: Seeing this error through Ray 1.13 when I run the following code:
import ray
ray.init(address="auto")
df = ray.data.read_parquet(" [s3://amazon-reviews-pds/parquet/] ")
print(df.groupby("product_category").count())
Here's the error:
(_partition_and_combine_block pid=1933) 2022-05-06 20:51:29,275 INFO worker.py:431 – Task failed with retryable exception: TaskID(7f0166b85ffd7f1fffffffffffffffffffffffff01000000).
(_partition_and_combine_block pid=1933) Traceback (most recent call last):
(_partition_and_combine_block pid=1933) File "python/ray/_raylet.pyx", line 625, in ray._raylet.execute_task
(_partition_and_combine_block pid=1933) File "python/ray/_raylet.pyx", line 629, in ray._raylet.execute_task
(_partition_and_combine_block pid=1933) File "/home/ray/anaconda3/lib/python3.7/site-packages/ray/data/grouped_dataset.py", line 436, in _partition_and_combine_block
(_partition_and_combine_block pid=1933) descending=False)
(_partition_and_combine_block pid=1933) File "/home/ray/anaconda3/lib/python3.7/site-packages/ray/data/impl/arrow_block.py", line 308, in sort_and_partition
(_partition_and_combine_block pid=1933) table = self._table.take(indices)
(_partition_and_combine_block pid=1933) File "pyarrow/table.pxi", line 1382, in pyarrow.lib.Table.take
(_partition_and_combine_block pid=1933) File "/home/ray/anaconda3/lib/python3.7/site-packages/pyarrow/compute.py", line 625, in take
(_partition_and_combine_block pid=1933) return call_function('take', [data, indices], options, memory_pool)
(_partition_and_combine_block pid=1933) File "pyarrow/_compute.pyx", line 528, in pyarrow._compute.call_function
(_partition_and_combine_block pid=1933) File "pyarrow/_compute.pyx", line 327, in pyarrow._compute.Function.call
(_partition_and_combine_block pid=1933) File "pyarrow/error.pxi", line 143, in pyarrow.lib.pyarrow_internal_check_status
(_partition_and_combine_block pid=1933) File "pyarrow/error.pxi", line 99, in pyarrow.lib.check_status
(_partition_and_combine_block pid=1933) pyarrow.lib.ArrowInvalid: offset overflow while concatenating arrays
GroupBy Map: 100%|████████████████████████████████| 200/200 [01:31<00:00, 2.18it/s]
GroupBy Reduce: 100%|██████████████████████████| 200/200 [00:00<00:00, 19776.52it/s]
Traceback (most recent call last):
File "/home/ray/parquet-raydata.py", line 10, in
Mayur Srivastava / @mayuropensource: Hi @lidavidm , is there any progress on this jira? (This issue is blocking a few use cases we have.)
Thanks,
Mayur Srivastava
David Li / @lidavidm: It needs someone motivated to sit down and work through the implementation. I can review/offer suggestions but probably don't have the time to implement this right now.
Note that I think the cases described in the comments above are fundamentally different from the original issue: they also require upgrading the output from Array to ChunkedArray (or from String/List to LargeString/LargeList) and so can't be done automatically.
Will Jones / @wjones127: I'm interested in working on this soon. I'll look through the issue a little deeper and ping you @lidavidm to get some ideas on the design.
David Li / @lidavidm: @wjones127 great! Looking forward to it.
Will Jones / @wjones127: I've looked through the code and I think there are three related issues. I'll try to describe them here. If you think I am missing some case, let me know. Otherwise, I'll open three sub-tasks and start work on those.
Problem 1: We concatenate when we shouldn't need to
This fails:
arr = pa.chunked_array([["a" * 2**30]] * 2)
arr.take([0,1])
# Traceback (most recent call last):
# File "<stdin>", line 1, in <module>
# File "pyarrow/table.pxi", line 998, in pyarrow.lib.ChunkedArray.take
# File "/Users/willjones/Documents/test-env/venv/lib/python3.9/site-packages/pyarrow/compute.py", line 457, in take
# return call_function('take', [data, indices], options, memory_pool)
# File "pyarrow/_compute.pyx", line 542, in pyarrow._compute.call_function
# File "pyarrow/_compute.pyx", line 341, in pyarrow._compute.Function.call
# File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
# File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status
# pyarrow.lib.ArrowInvalid: offset overflow while concatenating arrays
because we concatenate input values here. If that were corrected, it would then fail on the concatenation here if the indices were a chunked array.
The first concatenation could be avoided somewhat easily in special cases (sorted / fall in same chunk), which was partially implement in R. For the general case, we'd need to address this within the kernel rather than within pre-processing (see Problem 3).
The second concatenation shouldn't always be eliminated, but we might want to add a check to validate that there is enough room in offset buffers of arrays to concatenate. TBD if there is an efficient way to test that.
Problem 2: take_array kernel doesn't handle case of offset overflow
This is what Antoine was pointing out:
import pyarrow as pa
arr = pa.array(["x" * (1<<20)])
t = arr.take(pa.array([0]*((1<<12) + 1), type=pa.int8()))
t.validate(full=True)
# Traceback (most recent call last):
# [...]
# ArrowInvalid: Offset invariant failure: non-monotonic offset at slot 2048: -2147483648 < 2146435072
To solve this, I think we'd either have to:
-
(optionally?) promote arrays to Large variants of type. Problem is we'd need to do this cast consistently across chunks.
-
Switch to returning chunked arrays, and create new chunks as needed. (TBD: Could we do that in some cases (String, Binary, List types) and not others?)
Problem 3: there isn't a take_array kernel that handles ChunkedArrays
Finally, for sorting chunked arrays of type string/binary/list (that is, the case for take where the indices are out-of-order), I think we need to implement kernels specialized for chunked arrays. IIUC, everything but string/binary/list types could simply do the concatenation we do now; it's just those three types that need special logic to chunk as necessary to avoid offset overflows.
Antoine Pitrou / @pitrou: Ok, it's a bit unfortunate that several distinct issues have been amalgamated here :-)
I'd argue that this issue is primarily about fixing problem 3 (which would also fix problem 1). Besides correctness, concatenating is also a performance problem because we might be allocating a lot of temporary memory.
David Li / @lidavidm: Agreed, we should focus on 1/3. Problem 2 is also interesting, but I'm not sure how best to handle it: right now the kernels infrastructure assumes a fixed output type and shape up front, and dynamically switching to ChunkedArray or promoting type would be a surprise.
I would think we could avoid concatenation for all types, even if it isn't strictly required, to avoid excessive allocation as Antoine mentioned.
Hi! Issue 3 is bothering us a lot when sorting huge tables. Do we have any updates on this?
Hi! Issue 3 is bothering us a lot when sorting huge tables. Do we have any updates on this?
I'm working on this.
@felipecrv @jorisvandenbossche do we have a clear line of sight when this issue will be addressed.
Can you help me understand if there were/are any practical limitations around why chunking wasn't a consideration in the first place (for Table/ChunkedArray APIs)?
This is a pretty foundational issue for take API not doing chunking for arrays growing above 2Gb rendering this API essentially impossible to use for Data Processing.
For ex, in Ray Data
- We can't force users to go for int64-based types
- We can't blindly upcast all types to int64-based ones either
- We have to be able to handle columns growing above 2Gb
@alexeykudinkin take/filter on chunked arrays requires resolution of chunks which is more expensive than simple array offsetting [1].
Solutions that don't concatenate tend to be slower and are considered unacceptable. If it were up to me, I would leave the decision of concatenation to callers and never concatenate. I'm going to compromise and add conditional checks on sizes to decide if we concatenate or not. What I don't like about this solution is that it kinda doubles the amount of tests we need to run to cover both algorithms.
[1] https://github.com/apache/arrow/pull/41700
Solutions that don't concatenate tend to be slower and are considered unacceptable.
@felipecrv i totally get that. My point is twofold however:
- Doing take albeit slower still better than doing operation (concat) that's bound to fail (for arrays of cumulative size > 2 Gb)
- We don't need fall for either extreme of either concatenating all arrays or concatenating none -- we can do concatenations at int32 boundary drastically reducing number of arrays if we've been passed in a lot of small ones while still being able to handle the taking w/o blowing up.
Doing take albeit slower still better than doing operation (concat) that's bound to fail (for arrays of cumulative size > 2 Gb)
I agree. But all these conditionals increase the testing burden and that made me run out of energy to finish #41700.