[FLINK-37817] Adds bundled aggregates for group by (FLIP-491)
What is the purpose of the change
This PR implements one of the first portions of FLIP-491. It has a couple components:
- Adds a new operator
KeyedAsyncWaitOperatorwhich is similar toAsyncWaitOperator, but has keyed state.- Adds
KeyedAsyncFunctionwhich extendsAsyncFunction, but will has it's own context.- Future use will expose timer related methods (e.g. setting row time timers, proc timers)
- Note that it exposes
runOnMailboxThreadrather than just using thecomplete(SupplierWithException...)because anything requiring multiple async calls will need to processed async responses outside of a pure output result, so a more generic callback had to be introduced. Group by doesn't require this, but over operators will.
- Currently supports
ORDEREDoutput. Followup version will also supportROW_TIME, to support over operators.
- Adds
- The main group by implementation of a
KeyedAsyncFunction
Brief change log
- Adds
KeyedAsyncWaitOperatorandKeyedAsyncFunction. - Adds
BundledAggregateAsyncFunctionwhich handled group by
Verifying this change
Please make sure both new and modified tests in this PR follow the conventions for tests defined in our code quality guide.
This change added tests and can be verified as follows:
(example:)
- Added unit testing for
KeyedAsyncWaitOperatorandKeyedStreamElementQueueImpl - Added unit testing for
KeyedAsyncFunctionCommon - Added ITCase
BundledAggregateITCasewhich tests the whole stack together.
Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (yes / no)
- The public API, i.e., is any changed class annotated with
@Public(Evolving): (yes / no) - The serializers: (yes / no / don't know)
- The runtime per-record code paths (performance sensitive): (yes / no / don't know)
- Just for the new bundled type
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
- The S3 file system connector: (yes / no / don't know)
Documentation
- Does this pull request introduce a new feature? (yes / no)
- Just for the new configs.
- Will add more if necessary, though this is an internal-ish feature
- If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
CI report:
- d6830228273b051d48c39a9c867abed435fe4b35 Azure: SUCCESS
Bot commands
The @flinkbot bot supports the following commands:@flinkbot run azurere-run the last Azure build
This PR is being marked as stale since it has not had any activity in the last 90 days. If you would like to keep this PR alive, please leave a comment asking for a review. If the PR has merge conflicts, update it with the latest from the base branch.
If you are having difficulty finding a reviewer, please reach out to the community, contact details can be found here: https://flink.apache.org/what-is-flink/community/
If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed.
This PR has been closed since it has not had any activity in 120 days. If you feel like this was a mistake, or you would like to continue working on it, please feel free to re-open the PR and ask for a review.