arrow icon indicating copy to clipboard operation
arrow copied to clipboard

[C++] Take kernel can't handle ChunkedArrays that don't fit in an Array

Open asfimport opened this issue 5 years ago • 12 comments

Take() currently concatenates ChunkedArrays first. However, this breaks down when calling Take() from a ChunkedArray or Table where concatenating the arrays would result in an array that's too large. While inconvenient to implement, it would be useful if this case were handled.

This could be done as a higher-level wrapper around Take(), perhaps.

Example in Python:


>>> import pyarrow as pa
>>> pa.__version__
'1.0.0'
>>> rb1 = pa.RecordBatch.from_arrays([["a" * 2**30]], names=["a"])
>>> rb2 = pa.RecordBatch.from_arrays([["b" * 2**30]], names=["a"])
>>> table = pa.Table.from_batches([rb1, rb2], schema=rb1.schema)
>>> table.take([1, 0])
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "pyarrow/table.pxi", line 1145, in pyarrow.lib.Table.take
  File "/home/lidavidm/Code/twosigma/arrow/venv/lib/python3.8/site-packages/pyarrow/compute.py", line 268, in take
    return call_function('take', [data, indices], options)
  File "pyarrow/_compute.pyx", line 298, in pyarrow._compute.call_function
  File "pyarrow/_compute.pyx", line 192, in pyarrow._compute.Function.call
  File "pyarrow/error.pxi", line 122, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow/error.pxi", line 84, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: offset overflow while concatenating arrays

In this example, it would be useful if Take() or a higher-level wrapper could generate multiple record batches as output.

Reporter: Will Jones / @wjones127 Assignee: Will Jones / @wjones127

Related issues:

  • https://github.com/apache/arrow/issues/28385 (is a child of)
  • https://github.com/apache/arrow/issues/26738 (duplicate)
  • https://github.com/apache/arrow/issues/31249 (duplicate)
  • https://github.com/apache/arrow/issues/26738 (duplicate)
  • https://github.com/apache/arrow/issues/34583 (duplicate)
  • https://github.com/apache/arrow/issues/37766 (relates to)
  • https://github.com/apache/arrow/issues/23539 (relates to)
  • https://github.com/apache/arrow/issues/33049 (relates to)
  • https://github.com/apache/arrow/issues/40207 (relates to)

PRs and other links:

  • https://github.com/apache/arrow/pull/13857

Note: This issue was originally created as ARROW-9773. Please see the migration documentation for further details.

asfimport avatar Aug 17 '20 19:08 asfimport

Antoine Pitrou / @pitrou: Note that this can happen with regular arrays too:


>>> import pyarrow as pa
>>> arr = pa.array(["x" * (1<<20)])
>>> t = arr.take(pa.array([0]*((1<<12) + 1), type=pa.int8()))
>>> t.validate(full=True)
Traceback (most recent call last):
  [...]
ArrowInvalid: Offset invariant failure: non-monotonic offset at slot 2048: -2147483648 < 2146435072

asfimport avatar Dec 09 '20 16:12 asfimport

Leonard Lausen / @leezu: There is a similar issue with large tables (many rows) of medium size lists (~512 elements per list). When using pa.list_ type, take will fail due to offset overflow while concatenating arrays. Using pa.large_list works. (But in practice it doesn't help as .take performs 3 orders of magnitude (~1s vs ~1ms) slower than indexing operations on pandas..)

asfimport avatar Jan 24 '21 23:01 asfimport

Chris Fregly: Seeing this error through Ray 1.13 when I run the following code:

import ray

ray.init(address="auto")

df = ray.data.read_parquet(" [s3://amazon-reviews-pds/parquet/] ")

print(df.groupby("product_category").count())  

Here's the error: (_partition_and_combine_block pid=1933) 2022-05-06 20:51:29,275 INFO worker.py:431 – Task failed with retryable exception: TaskID(7f0166b85ffd7f1fffffffffffffffffffffffff01000000). (_partition_and_combine_block pid=1933) Traceback (most recent call last): (_partition_and_combine_block pid=1933) File "python/ray/_raylet.pyx", line 625, in ray._raylet.execute_task (_partition_and_combine_block pid=1933) File "python/ray/_raylet.pyx", line 629, in ray._raylet.execute_task (_partition_and_combine_block pid=1933) File "/home/ray/anaconda3/lib/python3.7/site-packages/ray/data/grouped_dataset.py", line 436, in _partition_and_combine_block (_partition_and_combine_block pid=1933) descending=False) (_partition_and_combine_block pid=1933) File "/home/ray/anaconda3/lib/python3.7/site-packages/ray/data/impl/arrow_block.py", line 308, in sort_and_partition (_partition_and_combine_block pid=1933) table = self._table.take(indices) (_partition_and_combine_block pid=1933) File "pyarrow/table.pxi", line 1382, in pyarrow.lib.Table.take (_partition_and_combine_block pid=1933) File "/home/ray/anaconda3/lib/python3.7/site-packages/pyarrow/compute.py", line 625, in take (_partition_and_combine_block pid=1933) return call_function('take', [data, indices], options, memory_pool) (_partition_and_combine_block pid=1933) File "pyarrow/_compute.pyx", line 528, in pyarrow._compute.call_function (_partition_and_combine_block pid=1933) File "pyarrow/_compute.pyx", line 327, in pyarrow._compute.Function.call (_partition_and_combine_block pid=1933) File "pyarrow/error.pxi", line 143, in pyarrow.lib.pyarrow_internal_check_status (_partition_and_combine_block pid=1933) File "pyarrow/error.pxi", line 99, in pyarrow.lib.check_status (_partition_and_combine_block pid=1933) pyarrow.lib.ArrowInvalid: offset overflow while concatenating arrays GroupBy Map: 100%|████████████████████████████████| 200/200 [01:31<00:00, 2.18it/s] GroupBy Reduce: 100%|██████████████████████████| 200/200 [00:00<00:00, 19776.52it/s] Traceback (most recent call last): File "/home/ray/parquet-raydata.py", line 10, in print(df.groupby("product_category").count().sort()) File "/home/ray/anaconda3/lib/python3.7/site-packages/ray/data/grouped_dataset.py", line 147, in count return self.aggregate(Count()) File "/home/ray/anaconda3/lib/python3.7/site-packages/ray/data/grouped_dataset.py", line 114, in aggregate metadata = ray.get(metadata) File "/home/ray/anaconda3/lib/python3.7/site-packages/ray/_private/client_mode_hook.py", line 105, in wrapper return func(*args, **kwargs) File "/home/ray/anaconda3/lib/python3.7/site-packages/ray/worker.py", line 1713, in get raise value.as_instanceof_cause() ray.exceptions.RayTaskError(ArrowInvalid): ray::_aggregate_combined_blocks() (pid=27147, ip=172.31.14.160) At least one of the input arguments for this task could not be computed: ray.exceptions.RayTaskError: ray::_partition_and_combine_block() (pid=1930, ip=172.31.14.160) File "/home/ray/anaconda3/lib/python3.7/site-packages/ray/data/grouped_dataset.py", line 436, in _partition_and_combine_block descending=False) File "/home/ray/anaconda3/lib/python3.7/site-packages/ray/data/impl/arrow_block.py", line 308, in sort_and_partition table = self._table.take(indices) File "pyarrow/table.pxi", line 1382, in pyarrow.lib.Table.take File "/home/ray/anaconda3/lib/python3.7/site-packages/pyarrow/compute.py", line 625, in take return call_function('take', [data, indices], options, memory_pool) File "pyarrow/_compute.pyx", line 528, in pyarrow._compute.call_function File "pyarrow/_compute.pyx", line 327, in pyarrow._compute.Function.call File "pyarrow/error.pxi", line 143, in pyarrow.lib.pyarrow_internal_check_status File "pyarrow/error.pxi", line 99, in pyarrow.lib.check_status pyarrow.lib.ArrowInvalid: offset overflow while concatenating arrays

asfimport avatar May 12 '22 02:05 asfimport

Mayur Srivastava / @mayuropensource: Hi @lidavidm , is there any progress on this jira? (This issue is blocking a few use cases we have.)

 

Thanks,

Mayur Srivastava

asfimport avatar May 31 '22 19:05 asfimport

David Li / @lidavidm: It needs someone motivated to sit down and work through the implementation. I can review/offer suggestions but probably don't have the time to implement this right now.

Note that I think the cases described in the comments above are fundamentally different from the original issue: they also require upgrading the output from Array to ChunkedArray (or from String/List to LargeString/LargeList) and so can't be done automatically.

asfimport avatar May 31 '22 19:05 asfimport

Will Jones / @wjones127: I'm interested in working on this soon. I'll look through the issue a little deeper and ping you @lidavidm to get some ideas on the design.

asfimport avatar Jun 06 '22 22:06 asfimport

David Li / @lidavidm: @wjones127 great! Looking forward to it.

asfimport avatar Jun 06 '22 22:06 asfimport

Will Jones / @wjones127: I've looked through the code and I think there are three related issues. I'll try to describe them here. If you think I am missing some case, let me know. Otherwise, I'll open three sub-tasks and start work on those.

Problem 1: We concatenate when we shouldn't need to

This fails:


arr = pa.chunked_array([["a" * 2**30]] * 2)
arr.take([0,1])
# Traceback (most recent call last):
#   File "<stdin>", line 1, in <module>
#   File "pyarrow/table.pxi", line 998, in pyarrow.lib.ChunkedArray.take
#   File "/Users/willjones/Documents/test-env/venv/lib/python3.9/site-packages/pyarrow/compute.py", line 457, in take
#     return call_function('take', [data, indices], options, memory_pool)
#   File "pyarrow/_compute.pyx", line 542, in pyarrow._compute.call_function
#   File "pyarrow/_compute.pyx", line 341, in pyarrow._compute.Function.call
#   File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
#   File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status
# pyarrow.lib.ArrowInvalid: offset overflow while concatenating arrays

 because we concatenate input values here. If that were corrected, it would then fail on the concatenation here if the indices were a chunked array.

The first concatenation could be avoided somewhat easily in special cases (sorted / fall in same chunk), which was partially implement in R. For the general case, we'd need to address this within the kernel rather than within pre-processing (see Problem 3).

The second concatenation shouldn't always be eliminated, but we might want to add a check to validate that there is enough room in offset buffers of arrays to concatenate. TBD if there is an efficient way to test that.

Problem 2: take_array kernel doesn't handle case of offset overflow

This is what Antoine was pointing out:


import pyarrow as pa
arr = pa.array(["x" * (1<<20)])
t = arr.take(pa.array([0]*((1<<12) + 1), type=pa.int8()))
t.validate(full=True)
# Traceback (most recent call last):
#   [...]
# ArrowInvalid: Offset invariant failure: non-monotonic offset at slot 2048: -2147483648 < 2146435072

To solve this, I think we'd either have to:

  1. (optionally?) promote arrays to Large variants of type. Problem is we'd need to do this cast consistently across chunks.

  2. Switch to returning chunked arrays, and create new chunks as needed. (TBD: Could we do that in some cases (String, Binary, List types) and not others?)

    Problem 3: there isn't a take_array kernel that handles ChunkedArrays

    Finally, for sorting chunked arrays of type string/binary/list (that is, the case for take where the indices are out-of-order), I think we need to implement kernels specialized for chunked arrays. IIUC, everything but string/binary/list types could simply do the concatenation we do now; it's just those three types that need special logic to chunk as necessary to avoid offset overflows.

     

     

asfimport avatar Jun 07 '22 19:06 asfimport

Antoine Pitrou / @pitrou: Ok, it's a bit unfortunate that several distinct issues have been amalgamated here :-)

I'd argue that this issue is primarily about fixing problem 3 (which would also fix problem 1). Besides correctness, concatenating is also a performance problem because we might be allocating a lot of temporary memory.

asfimport avatar Jun 08 '22 10:06 asfimport

David Li / @lidavidm: Agreed, we should focus on 1/3. Problem 2 is also interesting, but I'm not sure how best to handle it: right now the kernels infrastructure assumes a fixed output type and shape up front, and dynamically switching to ChunkedArray or promoting type would be a surprise.

I would think we could avoid concatenation for all types, even if it isn't strictly required, to avoid excessive allocation as Antoine mentioned.

asfimport avatar Jun 08 '22 11:06 asfimport

Hi! Issue 3 is bothering us a lot when sorting huge tables. Do we have any updates on this?

liujiajun avatar Dec 28 '23 04:12 liujiajun

Hi! Issue 3 is bothering us a lot when sorting huge tables. Do we have any updates on this?

I'm working on this.

felipecrv avatar Apr 18 '24 17:04 felipecrv

@felipecrv @jorisvandenbossche do we have a clear line of sight when this issue will be addressed.

Can you help me understand if there were/are any practical limitations around why chunking wasn't a consideration in the first place (for Table/ChunkedArray APIs)?

This is a pretty foundational issue for take API not doing chunking for arrays growing above 2Gb rendering this API essentially impossible to use for Data Processing.

For ex, in Ray Data

  1. We can't force users to go for int64-based types
  2. We can't blindly upcast all types to int64-based ones either
  3. We have to be able to handle columns growing above 2Gb

alexeykudinkin avatar Dec 09 '24 20:12 alexeykudinkin

@alexeykudinkin take/filter on chunked arrays requires resolution of chunks which is more expensive than simple array offsetting [1].

Solutions that don't concatenate tend to be slower and are considered unacceptable. If it were up to me, I would leave the decision of concatenation to callers and never concatenate. I'm going to compromise and add conditional checks on sizes to decide if we concatenate or not. What I don't like about this solution is that it kinda doubles the amount of tests we need to run to cover both algorithms.

[1] https://github.com/apache/arrow/pull/41700

felipecrv avatar Dec 10 '24 13:12 felipecrv

Solutions that don't concatenate tend to be slower and are considered unacceptable.

@felipecrv i totally get that. My point is twofold however:

  1. Doing take albeit slower still better than doing operation (concat) that's bound to fail (for arrays of cumulative size > 2 Gb)
  2. We don't need fall for either extreme of either concatenating all arrays or concatenating none -- we can do concatenations at int32 boundary drastically reducing number of arrays if we've been passed in a lot of small ones while still being able to handle the taking w/o blowing up.

alexeykudinkin avatar May 02 '25 00:05 alexeykudinkin

Doing take albeit slower still better than doing operation (concat) that's bound to fail (for arrays of cumulative size > 2 Gb)

I agree. But all these conditionals increase the testing burden and that made me run out of energy to finish #41700.

felipecrv avatar May 08 '25 20:05 felipecrv