flink icon indicating copy to clipboard operation
flink copied to clipboard

[FLINK-37817] Adds bundled aggregates for group by (FLIP-491)

Open AlanConfluent opened this issue 7 months ago • 1 comments

What is the purpose of the change

This PR implements one of the first portions of FLIP-491. It has a couple components:

  • Adds a new operator KeyedAsyncWaitOperator which is similar to AsyncWaitOperator, but has keyed state.
    • Adds KeyedAsyncFunction which extends AsyncFunction, but will has it's own context.
      • Future use will expose timer related methods (e.g. setting row time timers, proc timers)
      • Note that it exposes runOnMailboxThread rather than just using the complete(SupplierWithException...) because anything requiring multiple async calls will need to processed async responses outside of a pure output result, so a more generic callback had to be introduced. Group by doesn't require this, but over operators will.
    • Currently supports ORDERED output. Followup version will also support ROW_TIME, to support over operators.
  • The main group by implementation of a KeyedAsyncFunction

Brief change log

  • Adds KeyedAsyncWaitOperator and KeyedAsyncFunction.
  • Adds BundledAggregateAsyncFunction which handled group by

Verifying this change

Please make sure both new and modified tests in this PR follow the conventions for tests defined in our code quality guide.

This change added tests and can be verified as follows:

(example:)

  • Added unit testing for KeyedAsyncWaitOperator and KeyedStreamElementQueueImpl
  • Added unit testing for KeyedAsyncFunctionCommon
  • Added ITCase BundledAggregateITCase which tests the whole stack together.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
    • Just for the new bundled type
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
    • Just for the new configs.
    • Will add more if necessary, though this is an internal-ish feature
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

AlanConfluent avatar May 21 '25 15:05 AlanConfluent

CI report:

  • d6830228273b051d48c39a9c867abed435fe4b35 Azure: SUCCESS
Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

flinkbot avatar May 21 '25 15:05 flinkbot

This PR is being marked as stale since it has not had any activity in the last 90 days. If you would like to keep this PR alive, please leave a comment asking for a review. If the PR has merge conflicts, update it with the latest from the base branch.

If you are having difficulty finding a reviewer, please reach out to the community, contact details can be found here: https://flink.apache.org/what-is-flink/community/

If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed.

github-actions[bot] avatar Aug 20 '25 06:08 github-actions[bot]

This PR has been closed since it has not had any activity in 120 days. If you feel like this was a mistake, or you would like to continue working on it, please feel free to re-open the PR and ask for a review.

github-actions[bot] avatar Sep 19 '25 06:09 github-actions[bot]