flink-connector-aws icon indicating copy to clipboard operation
flink-connector-aws copied to clipboard

[FLINK-32097][Connectors/Kinesis] Implement support for Kinesis deaggregation

Open Lzgpom opened this issue 10 months ago • 8 comments

Purpose of the change

This PR implements the Kinesis deaggregation. The implementation was heavily based on the old kinesis connector.

Verifying this change

This change added tests and can be verified as follows:

  • Added integration tests for with record deaggregation
  • Added unit tests
  • Manually verified by running the Kinesis connector on a local Flink cluster.

Further ToDos and Follow-ups

Checkpoints do not take into consideration the sequence number of the deaggregated records.

Brief change log

  • 767f576d4295aa6c0ad04ac34ebb183cc57bd0cd Implement record deaggregation using AggregatorUtil from the KCL 3.x
  • ddcb1a9cf291cd9eab73829925a08da76b2dc107 Implemented tests for RecordBatch regarding deaggregation. There was a need for aggregated records to test. So it was used https://github.com/awslabs/kinesis-aggregation to do so. Unfortunatly, it does have a version in the maven repository compatible with KCL 3.x so it was used a workaround.
  • fce6d035270509f31fd4cf3d9a13b0737cb985a1 Added integration tests.
  • ddcb1a9cf291cd9eab73829925a08da76b2dc107 fce6d035270509f31fd4cf3d9a13b0737cb985a1 Added unit tests.

Significant changes

  • [x] Dependencies have been added or upgraded
  • [ ] Public API has been changed (Public API is any class annotated with @Public(Evolving))
  • [ ] Serializers have been changed
  • [ ] New feature has been introduced
    • If yes, how is this documented? (not applicable / docs / JavaDocs / not documented)

Lzgpom avatar Feb 01 '25 15:02 Lzgpom

Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)

boring-cyborg[bot] avatar Feb 01 '25 15:02 boring-cyborg[bot]

@hlteoh37 could you take a look?

Lzgpom avatar Feb 05 '25 13:02 Lzgpom

@Lzgpom Building this locally and I've got a spotless-check format violation. mvn spotless:apply fixes it

nicusX avatar Feb 07 '25 15:02 nicusX

I tested it locally, with a stream produced by KPL with aggregation enabled, and it worked fine.

nicusX avatar Feb 07 '25 17:02 nicusX

@Lzgpom Building this locally and I've got a spotless-check format violation. mvn spotless:apply fixes it

I tried to run mvn spotless:apply but it didn't have any changes. It allows reported that Spotless apply skipped

Lzgpom avatar Feb 07 '25 17:02 Lzgpom

Uhm, spotless:apply it did some changes for me, and after that mvn package succeeded

nicusX avatar Feb 07 '25 17:02 nicusX

If I run a simple mvn clean package on your branch I get spotess-check violations in RecordBatchTest.java + 10 other files. And mvn spotless:apply changes 12 files. All style definitions are part of the repo. Not sure I understand the different result

nicusX avatar Feb 07 '25 17:02 nicusX

@nicusX I was using the java 17 and the spotless did not work. I changed to java 11 and it worked. Thanks for notifying me about this.

Lzgpom avatar Feb 07 '25 19:02 Lzgpom