flink icon indicating copy to clipboard operation
flink copied to clipboard

[FLINK-24943][Connectors / Kinesis] Explicitly create KryoSerializer for SequenceNumber class in Kinesis Consumer

Open Cyberness opened this issue 3 years ago • 1 comments

What is the purpose of the change

Create KryoSerializer for SequenceNumber class explicitly to allow disabling of Generic Types via disableGenericTypes() call.

Brief change log

  1. Create KinesisStateUtil class with static method createShardsStateSerializer that explicitly assigns KryoSerializer to SequenceNumber class and use this method to initialize state
  2. Create unit test that checks for compatibility of previous TypeInformation based serializer and explicitly created KryoSerializer
  3. Replace Mokito calls with utility MockStreamingRuntimeContext to improve testability and unify initialization method of Streaming Runtime Context

Verifying this change

Added unit test to verify changed initialization of state.

Does this pull request potentially affect one of the following parts:

Dependencies (does it add or upgrade a dependency): no The public API, i.e., is any changed class annotated with @Public(Evolving): no The serializers: no The runtime per-record code paths (performance sensitive): no Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no The S3 file system connector: no

Documentation

Does this pull request introduce a new feature?: no If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented): no

Cyberness avatar Jul 12 '22 18:07 Cyberness

CI report:

  • 2b3d74c5d86226df86fcece30a24caa7e58ee3d2 Azure: FAILURE
Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

flinkbot avatar Jul 12 '22 18:07 flinkbot

The code changes look good. Recognise that we specify a KryoSerializer instead of making the SequenceNumber a POJO to maintain backwards compatibility. Given that we are changing the state serializer, could we do a sanity test to check the following?

  • we can disableGenericTypes (as reported here)
  • a snapshot created from old consumer is valid for a new consumer

hlteoh37 avatar Oct 11 '22 22:10 hlteoh37

The code changes look good. Recognise that we specify a KryoSerializer instead of making the SequenceNumber a POJO to maintain backwards compatibility. Given that we are changing the state serializer, could we do a sanity test to check the following?

  • we can disableGenericTypes (as reported here)
  • a snapshot created from old consumer is valid for a new consumer

Yes, I can work on it.

Cyberness avatar Oct 11 '22 23:10 Cyberness

@Cyberness please also rebase and squash commits, and update commit message as per the contribution guide

dannycranmer avatar Oct 14 '22 16:10 dannycranmer

Does it still make sense to work on merging this change considering FLINK-30557?

Cyberness avatar Jan 11 '23 22:01 Cyberness

We have now moved the consumer here https://github.com/apache/flink-connector-aws as part of the connector externalisation effort!

hlteoh37 avatar Jan 12 '23 00:01 hlteoh37