arrow icon indicating copy to clipboard operation
arrow copied to clipboard

ARROW-17642: [C++] Add ordered aggregation

Open rtpsw opened this issue 3 years ago • 13 comments
trafficstars

See https://issues.apache.org/jira/browse/ARROW-17642

rtpsw avatar Oct 08 '22 21:10 rtpsw

https://issues.apache.org/jira/browse/ARROW-17642

github-actions[bot] avatar Oct 08 '22 21:10 github-actions[bot]

This PR implements a generalization of ordered aggregation called segmented aggregation, in which the segment-keys split the input rows into segments of consecutive rows with common values for these segment-keys. In other words, the extra keys are not required to be ordered but only to change from one segment to the next. I opted to generalize this way because the extra implementation effort is quite small and the result is useful.

rtpsw avatar Oct 08 '22 21:10 rtpsw

Note that the current implementation does not support streaming. In particular, the segment-aggregation operation produces the entire output after processing the entire input. However, the current implementation gets close enough to allow a future PR to add support for streaming without too much effort.

cc @icexelloss

rtpsw avatar Oct 08 '22 21:10 rtpsw

Note that this PR includes code from this and this PR, which are pending at this time.

rtpsw avatar Oct 08 '22 21:10 rtpsw

Thanks @rtpsw! Will try to take a look

icexelloss avatar Oct 10 '22 19:10 icexelloss

@lidavidm, are you a good person to review this? or can you suggest someone? Also, note this PR includes a new SWMR gadget which may be of wider interest and could be upgraded to a visible internal API, WDYT?

rtpsw avatar Oct 15 '22 09:10 rtpsw

Weston would probably be best, but he's going to be busy with release things. Do you need a review soon?

The link to the gadget is dead, so I'm not sure what you're referencing

lidavidm avatar Oct 15 '22 13:10 lidavidm

Weston would probably be best, but he's going to be busy with release things. Do you need a review soon?

Personally, I can hold. @icexelloss should answer if he needs this sooner.

The link to the gadget is dead, so I'm not sure what you're referencing

Sorry about that. Please see GatedSharedMutex and related code in cpp/src/arrow/compute/exec/aggregate_node.cc.

rtpsw avatar Oct 15 '22 13:10 rtpsw

The link to the gadget is dead, so I'm not sure what you're referencing

Sorry about that. Please see GatedSharedMutex and related code in cpp/src/arrow/compute/exec/aggregate_node.cc.

Turns out the link is OK but GitHub avoids scrolling it into view when the containing file has a large diff. When I click Load diff for this file I see GitHub highlights the correct block of code for the link. I'm not sure whether this is a bug or a feature of GitHub.

rtpsw avatar Oct 15 '22 19:10 rtpsw

Notes about the recent commit:

  • The PyArrow GroupBy API currently does not expose the segment_keys parameter. I think this should be done in a separate task.
  • The C++ GroupBy API currently does not support streaming. While the Grouper class it relies on has been refactored to make it easy to add streaming support, I think this is not a priority for the current PR because the important API is the aggregate node.
  • The aggregate node API now supports streaming. However, the existing tests only cover the case of an empty segment_keys parameter. These tests do cover the aggregate node's code in the PR but not that of all GroupingSegment implementations. I intend to add more tests here.
  • The SegmentKeyWithChunkedArray test only covers chunked arrays and a single-key segmenter. I intend to add tests for other datum kinds and segmenter types.

It would be good to get feedback on the approach for this PR even before I add further tests.

rtpsw avatar Oct 15 '22 19:10 rtpsw

@lidavidm We do not need this soon. This can wait a little bit (2 weeks or so)

icexelloss avatar Oct 17 '22 13:10 icexelloss

Pushed a change I had in flight. Holding for now.

rtpsw avatar Oct 17 '22 15:10 rtpsw

@lidavidm, are you a good person to review this? or can you suggest someone? Also, note this PR includes a new SWMR gadget which may be of wider interest and could be upgraded to a visible internal API, WDYT?

To be fair, there are plenty of possible mutex policies (example discussion). I think the first question is whether the usefulness to Arrow of the proposed policy and its implementation are worth a (separate) discussion.

rtpsw avatar Oct 23 '22 09:10 rtpsw

@lidavidm, it may be a good time to get back to this. cc @icexelloss

rtpsw avatar Nov 06 '22 14:11 rtpsw

Yes, sorry. I'll see if I can get to this soon. Or CC @westonpace.

lidavidm avatar Nov 07 '22 12:11 lidavidm

There's some discussion here, I should have pinged you at the time: https://issues.apache.org/jira/browse/ARROW-17965

lidavidm avatar Nov 10 '22 20:11 lidavidm

Thanks for all the comments. I'll try to post some quick responses, but I'll need to get back to this later, due to other priorities.

rtpsw avatar Nov 10 '22 20:11 rtpsw

There's some discussion here, I should have pinged you at the time: https://issues.apache.org/jira/browse/ARROW-17965

No worries! I've been pretty busy these last few weeks so I don't know that I would have caught it anyways.

westonpace avatar Nov 10 '22 22:11 westonpace

One CI job failure is worth a look, but may be unrelated to the PR's changes.

rtpsw avatar Nov 14 '22 18:11 rtpsw

@westonpace, @lidavidm - is this good to go?

rtpsw avatar Nov 17 '22 08:11 rtpsw

I'm still pretty reluctant to add code to handle chunked arrays. I feel it adds complexity that we will end up maintaining when chunked arrays don't really have a place in a streaming execution engine (since we process things once batch at a time usually).

This is understandable. I'll try to drop support for chunked arrays in this PR and report back on what seems to break; we may be able to find an alternative approach.

rtpsw avatar Nov 19 '22 08:11 rtpsw

This CI job failure may be related to the recent commit, though I can't think of which change could have caused it. How to set up an environment to try to reproduce it? Or perhaps there is a way to grab the core dump or raw log file?

rtpsw avatar Nov 20 '22 19:11 rtpsw

I'm still pretty reluctant to add code to handle chunked arrays. I feel it adds complexity that we will end up maintaining when chunked arrays don't really have a place in a streaming execution engine (since we process things once batch at a time usually).

This is understandable. I'll try to drop support for chunked arrays in this PR and report back on what seems to break; we may be able to find an alternative approach.

My investigation suggests that the reason for introducing handling of chunked arrays in the first place is that the testers use tables, and their implementing class SimpleTable has ChunkedArray columns (even after CombineChunks) that the aggregation code needs to handle. Therefore, if we remove support for chunked arrays in the aggregation code, then it won't work nicely with table inputs. AFAIU, aggregating tables is a valid use case that should be supported. @westonpace, let me know your thoughts.

rtpsw avatar Nov 27 '22 15:11 rtpsw

Does the non ordered / hash aggregation handles chunk array?

On Sun, Nov 27, 2022 at 4:07 PM rtpsw @.***> wrote:

I'm still pretty reluctant to add code to handle chunked arrays. I feel it adds complexity that we will end up maintaining when chunked arrays don't really have a place in a streaming execution engine (since we process things once batch at a time usually).

This is understandable. I'll try to drop support for chunked arrays in this PR and report back on what seems to break; we may be able to find an alternative approach.

My investigation suggests that the reason for introducing chunks in the first place is that the testers use tables, and their implementing class SimpleTable has ChunkedArray columns (even after CombineChunks) that the aggregation code needs to handle. Therefore, if we remove support for chunked arrays in the aggregation code, then it won't work nicely with table inputs. AFAIU, aggregating tables is a valid use case that should be supported. @westonpace https://github.com/westonpace, let me know you thoughts.

— Reply to this email directly, view it on GitHub https://github.com/apache/arrow/pull/14352#issuecomment-1328266070, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAGBXLDTXW4NCU2VWOULFW3WKN2M5ANCNFSM6AAAAAARANAUMQ . You are receiving this because you were mentioned.Message ID: @.***>

icexelloss avatar Nov 27 '22 15:11 icexelloss

Does the non ordered / hash aggregation handles chunk array?

AFAICS, yes, There are plenty of pre-PR GroupBy tests in hash_aggregate_test.cc, such as CountOnly and SumOnly, that create a table as input for (pre-PR unordered) aggregation using GroupBy (or AlternatorGroupBy in the recent commit of this PR).

rtpsw avatar Nov 27 '22 16:11 rtpsw

This CI job failure may be related to the recent commit, though I can't think of which change could have caused it. How to set up an environment to try to reproduce it? Or perhaps there is a way to grab the core dump or raw log file?

I see the same failure for the latest commit, so same questions.

rtpsw avatar Nov 27 '22 16:11 rtpsw

This CI job failure may be related to the recent commit, though I can't think of which change could have caused it. How to set up an environment to try to reproduce it? Or perhaps there is a way to grab the core dump or raw log file?

I see the same failure for the latest commit, so same questions.

I followed up on the job's workflow and found the command archery docker run java-jni-manylinux-2014 but on my machine this runs into errors I don't know how to deal with:

$ archery docker run java-jni-manylinux-2014
Traceback (most recent call last):
  File "/mnt/venv/bin/archery", line 33, in <module>
    sys.exit(load_entry_point('archery', 'console_scripts', 'archery')())
  File "/mnt/venv/lib/python3.10/site-packages/click/core.py", line 1130, in __call__
    return self.main(*args, **kwargs)
  File "/mnt/venv/lib/python3.10/site-packages/click/core.py", line 1055, in main
    rv = self.invoke(ctx)
  File "/mnt/venv/lib/python3.10/site-packages/click/core.py", line 1657, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/mnt/venv/lib/python3.10/site-packages/click/core.py", line 1654, in invoke
    super().invoke(ctx)
  File "/mnt/venv/lib/python3.10/site-packages/click/core.py", line 1404, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/mnt/venv/lib/python3.10/site-packages/click/core.py", line 760, in invoke
    return __callback(*args, **kwargs)
  File "/mnt/venv/lib/python3.10/site-packages/click/decorators.py", line 26, in new_func
    return f(get_current_context(), *args, **kwargs)
  File "/mnt/github/rtpsw/arrow/dev/archery/archery/docker/cli.py", line 67, in docker
    compose = DockerCompose(config_path, params=os.environ)
  File "/mnt/github/rtpsw/arrow/dev/archery/archery/docker/core.py", line 169, in __init__
    self.config = ComposeConfig(config_path, dotenv_path, compose_bin,
  File "/mnt/github/rtpsw/arrow/dev/archery/archery/docker/core.py", line 68, in __init__
    self._read_config(config_path, compose_bin)
  File "/mnt/github/rtpsw/arrow/dev/archery/archery/docker/core.py", line 136, in _read_config
    raise ValueError(
ValueError: Found errors with docker-compose:
 - /snap/docker/2285/lib/python3.6/site-packages/paramiko/transport.py:33: CryptographyDeprecationWarning: Python 3.6 is no longer supported by the Python core team. Therefore, support for it is deprecated in cryptography and will be removed in a future release.
 -   from cryptography.hazmat.backends import default_backend
 - Traceback (most recent call last):
 -   File "/snap/docker/2285/bin/docker-compose", line 33, in <module>
 -     sys.exit(load_entry_point('docker-compose==1.29.2', 'console_scripts', 'docker-compose')())
 -   File "/snap/docker/2285/lib/python3.6/site-packages/compose/cli/main.py", line 81, in main
 -     command_func()
 -   File "/snap/docker/2285/lib/python3.6/site-packages/compose/cli/main.py", line 197, in perform_command
 -     handler(command, command_options)
 -   File "/snap/docker/2285/lib/python3.6/site-packages/compose/metrics/decorator.py", line 18, in wrapper
 -     result = fn(*args, **kwargs)
 -   File "/snap/docker/2285/lib/python3.6/site-packages/compose/cli/main.py", line 404, in config
 -     compose_config = get_config_from_options('.', self.toplevel_options, additional_options)
 -   File "/snap/docker/2285/lib/python3.6/site-packages/compose/cli/command.py", line 104, in get_config_from_options
 -     environment = Environment.from_env_file(override_dir or base_dir, environment_file)
 -   File "/snap/docker/2285/lib/python3.6/site-packages/compose/config/environment.py", line 67, in from_env_file
 -     instance = _initialize()
 -   File "/snap/docker/2285/lib/python3.6/site-packages/compose/config/environment.py", line 62, in _initialize
 -     return cls(env_vars_from_file(env_file_path))
 -   File "/snap/docker/2285/lib/python3.6/site-packages/compose/config/environment.py", line 38, in env_vars_from_file
 -     env = dotenv.dotenv_values(dotenv_path=filename, encoding='utf-8-sig', interpolate=interpolate)
 -   File "/snap/docker/2285/lib/python3.6/site-packages/dotenv/main.py", line 372, in dotenv_values
 -     encoding=encoding,
 -   File "/snap/docker/2285/lib/python3.6/site-packages/dotenv/main.py", line 74, in dict
 -     self._dict = OrderedDict(resolve_variables(raw_values, override=self.override))
 -   File "/snap/docker/2285/lib/python3.6/site-packages/dotenv/main.py", line 231, in resolve_variables
 -     for (name, value) in values:
 -   File "/snap/docker/2285/lib/python3.6/site-packages/dotenv/main.py", line 81, in parse
 -     with self._get_stream() as stream:
 -   File "/snap/docker/2285/usr/lib/python3.6/contextlib.py", line 81, in __enter__
 -     return next(self.gen)
 -   File "/snap/docker/2285/lib/python3.6/site-packages/dotenv/main.py", line 54, in _get_stream
 -     with io.open(self.dotenv_path, encoding=self.encoding) as stream:
 - PermissionError: [Errno 13] Permission denied: '/mnt/github/rtpsw/arrow/.env'

rtpsw avatar Nov 28 '22 11:11 rtpsw

AFAIU, aggregating tables is a valid use case that should be supported. @westonpace, let me know your thoughts.

Aggregation of tables should be done by using a table source node and an exec plan. Not by using the GroupBy function in aggregate.h. That was the intent behind the comment:

/// Internal use only: helpers for PyArrow and testing HashAggregateKernels.
/// For public use see arrow::compute::Grouper or create an execution plan
/// and use an aggregate node.

westonpace avatar Dec 06 '22 08:12 westonpace

AFAIU, aggregating tables is a valid use case that should be supported. @westonpace, let me know your thoughts.

Aggregation of tables should be done by using a table source node and an exec plan. Not by using the GroupBy function in aggregate.h. That was the intent behind the comment:

/// Internal use only: helpers for PyArrow and testing HashAggregateKernels.
/// For public use see arrow::compute::Grouper or create an execution plan
/// and use an aggregate node.

IIUC, support of chunked arrays is needed only for GroupBy as a testing facility (and that this is the case with hash aggregation too), right? So, if we remove this support, some current testing using GroupBy would break and would need to be removed too. @westonpace, are you fine with this outcome?

rtpsw avatar Dec 06 '22 10:12 rtpsw

IIUC, support of chunked arrays is needed only for GroupBy as a testing facility (and that this is https://github.com/apache/arrow/pull/14352/#issuecomment-1328279070 too), right? So, if we remove this support, some current testing using GroupBy would break and would need to be removed too. @westonpace, are you fine with this outcome?

I have created https://github.com/apache/arrow/pull/14867 which removes the internal GroupBy facility entirely (well, remaps it onto exec plans).

westonpace avatar Dec 07 '22 08:12 westonpace