arrow
arrow copied to clipboard
ARROW-17642: [C++] Add ordered aggregation
See https://issues.apache.org/jira/browse/ARROW-17642
https://issues.apache.org/jira/browse/ARROW-17642
This PR implements a generalization of ordered aggregation called segmented aggregation, in which the segment-keys split the input rows into segments of consecutive rows with common values for these segment-keys. In other words, the extra keys are not required to be ordered but only to change from one segment to the next. I opted to generalize this way because the extra implementation effort is quite small and the result is useful.
Note that the current implementation does not support streaming. In particular, the segment-aggregation operation produces the entire output after processing the entire input. However, the current implementation gets close enough to allow a future PR to add support for streaming without too much effort.
cc @icexelloss
Thanks @rtpsw! Will try to take a look
@lidavidm, are you a good person to review this? or can you suggest someone? Also, note this PR includes a new SWMR gadget which may be of wider interest and could be upgraded to a visible internal API, WDYT?
Weston would probably be best, but he's going to be busy with release things. Do you need a review soon?
The link to the gadget is dead, so I'm not sure what you're referencing
Weston would probably be best, but he's going to be busy with release things. Do you need a review soon?
Personally, I can hold. @icexelloss should answer if he needs this sooner.
The link to the gadget is dead, so I'm not sure what you're referencing
Sorry about that. Please see GatedSharedMutex and related code in cpp/src/arrow/compute/exec/aggregate_node.cc.
The link to the gadget is dead, so I'm not sure what you're referencing
Sorry about that. Please see
GatedSharedMutexand related code incpp/src/arrow/compute/exec/aggregate_node.cc.
Turns out the link is OK but GitHub avoids scrolling it into view when the containing file has a large diff. When I click Load diff for this file I see GitHub highlights the correct block of code for the link. I'm not sure whether this is a bug or a feature of GitHub.
Notes about the recent commit:
- The PyArrow
GroupByAPI currently does not expose thesegment_keysparameter. I think this should be done in a separate task. - The C++
GroupByAPI currently does not support streaming. While theGrouperclass it relies on has been refactored to make it easy to add streaming support, I think this is not a priority for the current PR because the important API is the aggregate node. - The aggregate node API now supports streaming. However, the existing tests only cover the case of an empty
segment_keysparameter. These tests do cover the aggregate node's code in the PR but not that of allGroupingSegmentimplementations. I intend to add more tests here. - The
SegmentKeyWithChunkedArraytest only covers chunked arrays and a single-key segmenter. I intend to add tests for other datum kinds and segmenter types.
It would be good to get feedback on the approach for this PR even before I add further tests.
@lidavidm We do not need this soon. This can wait a little bit (2 weeks or so)
Pushed a change I had in flight. Holding for now.
@lidavidm, are you a good person to review this? or can you suggest someone? Also, note this PR includes a new SWMR gadget which may be of wider interest and could be upgraded to a visible internal API, WDYT?
To be fair, there are plenty of possible mutex policies (example discussion). I think the first question is whether the usefulness to Arrow of the proposed policy and its implementation are worth a (separate) discussion.
@lidavidm, it may be a good time to get back to this. cc @icexelloss
Yes, sorry. I'll see if I can get to this soon. Or CC @westonpace.
There's some discussion here, I should have pinged you at the time: https://issues.apache.org/jira/browse/ARROW-17965
Thanks for all the comments. I'll try to post some quick responses, but I'll need to get back to this later, due to other priorities.
There's some discussion here, I should have pinged you at the time: https://issues.apache.org/jira/browse/ARROW-17965
No worries! I've been pretty busy these last few weeks so I don't know that I would have caught it anyways.
One CI job failure is worth a look, but may be unrelated to the PR's changes.
@westonpace, @lidavidm - is this good to go?
I'm still pretty reluctant to add code to handle chunked arrays. I feel it adds complexity that we will end up maintaining when chunked arrays don't really have a place in a streaming execution engine (since we process things once batch at a time usually).
This is understandable. I'll try to drop support for chunked arrays in this PR and report back on what seems to break; we may be able to find an alternative approach.
This CI job failure may be related to the recent commit, though I can't think of which change could have caused it. How to set up an environment to try to reproduce it? Or perhaps there is a way to grab the core dump or raw log file?
I'm still pretty reluctant to add code to handle chunked arrays. I feel it adds complexity that we will end up maintaining when chunked arrays don't really have a place in a streaming execution engine (since we process things once batch at a time usually).
This is understandable. I'll try to drop support for chunked arrays in this PR and report back on what seems to break; we may be able to find an alternative approach.
My investigation suggests that the reason for introducing handling of chunked arrays in the first place is that the testers use tables, and their implementing class SimpleTable has ChunkedArray columns (even after CombineChunks) that the aggregation code needs to handle. Therefore, if we remove support for chunked arrays in the aggregation code, then it won't work nicely with table inputs. AFAIU, aggregating tables is a valid use case that should be supported. @westonpace, let me know your thoughts.
Does the non ordered / hash aggregation handles chunk array?
On Sun, Nov 27, 2022 at 4:07 PM rtpsw @.***> wrote:
I'm still pretty reluctant to add code to handle chunked arrays. I feel it adds complexity that we will end up maintaining when chunked arrays don't really have a place in a streaming execution engine (since we process things once batch at a time usually).
This is understandable. I'll try to drop support for chunked arrays in this PR and report back on what seems to break; we may be able to find an alternative approach.
My investigation suggests that the reason for introducing chunks in the first place is that the testers use tables, and their implementing class SimpleTable has ChunkedArray columns (even after CombineChunks) that the aggregation code needs to handle. Therefore, if we remove support for chunked arrays in the aggregation code, then it won't work nicely with table inputs. AFAIU, aggregating tables is a valid use case that should be supported. @westonpace https://github.com/westonpace, let me know you thoughts.
— Reply to this email directly, view it on GitHub https://github.com/apache/arrow/pull/14352#issuecomment-1328266070, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAGBXLDTXW4NCU2VWOULFW3WKN2M5ANCNFSM6AAAAAARANAUMQ . You are receiving this because you were mentioned.Message ID: @.***>
Does the non ordered / hash aggregation handles chunk array?
AFAICS, yes, There are plenty of pre-PR GroupBy tests in hash_aggregate_test.cc, such as CountOnly and SumOnly, that create a table as input for (pre-PR unordered) aggregation using GroupBy (or AlternatorGroupBy in the recent commit of this PR).
This CI job failure may be related to the recent commit, though I can't think of which change could have caused it. How to set up an environment to try to reproduce it? Or perhaps there is a way to grab the core dump or raw log file?
I see the same failure for the latest commit, so same questions.
This CI job failure may be related to the recent commit, though I can't think of which change could have caused it. How to set up an environment to try to reproduce it? Or perhaps there is a way to grab the core dump or raw log file?
I see the same failure for the latest commit, so same questions.
I followed up on the job's workflow and found the command archery docker run java-jni-manylinux-2014 but on my machine this runs into errors I don't know how to deal with:
$ archery docker run java-jni-manylinux-2014
Traceback (most recent call last):
File "/mnt/venv/bin/archery", line 33, in <module>
sys.exit(load_entry_point('archery', 'console_scripts', 'archery')())
File "/mnt/venv/lib/python3.10/site-packages/click/core.py", line 1130, in __call__
return self.main(*args, **kwargs)
File "/mnt/venv/lib/python3.10/site-packages/click/core.py", line 1055, in main
rv = self.invoke(ctx)
File "/mnt/venv/lib/python3.10/site-packages/click/core.py", line 1657, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/mnt/venv/lib/python3.10/site-packages/click/core.py", line 1654, in invoke
super().invoke(ctx)
File "/mnt/venv/lib/python3.10/site-packages/click/core.py", line 1404, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/mnt/venv/lib/python3.10/site-packages/click/core.py", line 760, in invoke
return __callback(*args, **kwargs)
File "/mnt/venv/lib/python3.10/site-packages/click/decorators.py", line 26, in new_func
return f(get_current_context(), *args, **kwargs)
File "/mnt/github/rtpsw/arrow/dev/archery/archery/docker/cli.py", line 67, in docker
compose = DockerCompose(config_path, params=os.environ)
File "/mnt/github/rtpsw/arrow/dev/archery/archery/docker/core.py", line 169, in __init__
self.config = ComposeConfig(config_path, dotenv_path, compose_bin,
File "/mnt/github/rtpsw/arrow/dev/archery/archery/docker/core.py", line 68, in __init__
self._read_config(config_path, compose_bin)
File "/mnt/github/rtpsw/arrow/dev/archery/archery/docker/core.py", line 136, in _read_config
raise ValueError(
ValueError: Found errors with docker-compose:
- /snap/docker/2285/lib/python3.6/site-packages/paramiko/transport.py:33: CryptographyDeprecationWarning: Python 3.6 is no longer supported by the Python core team. Therefore, support for it is deprecated in cryptography and will be removed in a future release.
- from cryptography.hazmat.backends import default_backend
- Traceback (most recent call last):
- File "/snap/docker/2285/bin/docker-compose", line 33, in <module>
- sys.exit(load_entry_point('docker-compose==1.29.2', 'console_scripts', 'docker-compose')())
- File "/snap/docker/2285/lib/python3.6/site-packages/compose/cli/main.py", line 81, in main
- command_func()
- File "/snap/docker/2285/lib/python3.6/site-packages/compose/cli/main.py", line 197, in perform_command
- handler(command, command_options)
- File "/snap/docker/2285/lib/python3.6/site-packages/compose/metrics/decorator.py", line 18, in wrapper
- result = fn(*args, **kwargs)
- File "/snap/docker/2285/lib/python3.6/site-packages/compose/cli/main.py", line 404, in config
- compose_config = get_config_from_options('.', self.toplevel_options, additional_options)
- File "/snap/docker/2285/lib/python3.6/site-packages/compose/cli/command.py", line 104, in get_config_from_options
- environment = Environment.from_env_file(override_dir or base_dir, environment_file)
- File "/snap/docker/2285/lib/python3.6/site-packages/compose/config/environment.py", line 67, in from_env_file
- instance = _initialize()
- File "/snap/docker/2285/lib/python3.6/site-packages/compose/config/environment.py", line 62, in _initialize
- return cls(env_vars_from_file(env_file_path))
- File "/snap/docker/2285/lib/python3.6/site-packages/compose/config/environment.py", line 38, in env_vars_from_file
- env = dotenv.dotenv_values(dotenv_path=filename, encoding='utf-8-sig', interpolate=interpolate)
- File "/snap/docker/2285/lib/python3.6/site-packages/dotenv/main.py", line 372, in dotenv_values
- encoding=encoding,
- File "/snap/docker/2285/lib/python3.6/site-packages/dotenv/main.py", line 74, in dict
- self._dict = OrderedDict(resolve_variables(raw_values, override=self.override))
- File "/snap/docker/2285/lib/python3.6/site-packages/dotenv/main.py", line 231, in resolve_variables
- for (name, value) in values:
- File "/snap/docker/2285/lib/python3.6/site-packages/dotenv/main.py", line 81, in parse
- with self._get_stream() as stream:
- File "/snap/docker/2285/usr/lib/python3.6/contextlib.py", line 81, in __enter__
- return next(self.gen)
- File "/snap/docker/2285/lib/python3.6/site-packages/dotenv/main.py", line 54, in _get_stream
- with io.open(self.dotenv_path, encoding=self.encoding) as stream:
- PermissionError: [Errno 13] Permission denied: '/mnt/github/rtpsw/arrow/.env'
AFAIU, aggregating tables is a valid use case that should be supported. @westonpace, let me know your thoughts.
Aggregation of tables should be done by using a table source node and an exec plan. Not by using the GroupBy function in aggregate.h. That was the intent behind the comment:
/// Internal use only: helpers for PyArrow and testing HashAggregateKernels.
/// For public use see arrow::compute::Grouper or create an execution plan
/// and use an aggregate node.
AFAIU, aggregating tables is a valid use case that should be supported. @westonpace, let me know your thoughts.
Aggregation of tables should be done by using a table source node and an exec plan. Not by using the
GroupByfunction inaggregate.h. That was the intent behind the comment:/// Internal use only: helpers for PyArrow and testing HashAggregateKernels. /// For public use see arrow::compute::Grouper or create an execution plan /// and use an aggregate node.
IIUC, support of chunked arrays is needed only for GroupBy as a testing facility (and that this is the case with hash aggregation too), right? So, if we remove this support, some current testing using GroupBy would break and would need to be removed too. @westonpace, are you fine with this outcome?
IIUC, support of chunked arrays is needed only for GroupBy as a testing facility (and that this is https://github.com/apache/arrow/pull/14352/#issuecomment-1328279070 too), right? So, if we remove this support, some current testing using GroupBy would break and would need to be removed too. @westonpace, are you fine with this outcome?
I have created https://github.com/apache/arrow/pull/14867 which removes the internal GroupBy facility entirely (well, remaps it onto exec plans).