flink-connector-aws
flink-connector-aws copied to clipboard
[FLINK-35299] Respect initial position for new streams
Purpose of the change
According to the javadoc, the STREAM_INITIAL_POSITION
property defines where to start reading Kinesis streams from. However, in the current implementation this is only true if there isn't any restore state at all for any streams for that KinesisConsumer, otherwise the new stream is handled the same way a new shard for and existing stream is: start consuming from EARLIEST (same as TRIM_HORIZON initial position).
This MR changes that by making FlinkKinesisConsumer
to use STREAM_INITIAL_POSITION
config for new streams, which aligns with that is documented.
This behavior is disabled by default to not introduce a breaking change, but can be enabled by setting flink.stream.initpos-for-new-streams
to true.
Additionally, a second config was created - flink.stream.initpos-streams
- to allow specific streams to be "reset" to whatever the STREAM_INITIAL_POSITION is defined. This is an important addition in this MR because users who notice this bug and want to enable the correct behaviour will want to reset the now recorded offset for the new stream.
Verifying this change
TODO
Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
- Added integration tests for end-to-end deployment
- Added unit tests
- Manually verified by running the Kinesis connector on a local Flink cluster.
Significant changes
(Please check any boxes [x] if the answer is "yes". You can first publish the PR and check them afterwards, for convenience.)
- [ ] Dependencies have been added or upgraded
- [ ] Public API has been changed (Public API is any class annotated with
@Public(Evolving)
) - [ ] Serializers have been changed
- [x] New feature has been introduced
- If yes, how is this documented? (not applicable / docs / JavaDocs / not documented) docs and JavaDocs where appropiate