flink icon indicating copy to clipboard operation
flink copied to clipboard

[FLINK-37327] [Formats (JSON, Avro, Parquet, ORC, SequenceFile)] Debezium Avro Format: Add FormatOption to Optionally Skip emitting UPDATE_BEFORE Rows

Open klam-shop opened this issue 10 months ago • 4 comments

What is the purpose of the change

Add a Format Option to the Debezium Format to optionally skip emitting the UPDATE_BEFORE Rows when deserializing a Debezium message with op='u'.

This is helpful for Flink SQL applications that want to operate in UPSERT (ChangelogMode=[I,UA,D]) mode and save on processing the UPDATE_BEFORE Rows since the downstream sinks can handle it.

Note no changes are required for the encoder since Flink encodes UPDATE_BEFORE and UPDATE_AFTER as DELETE and INSERT Debezium messages.

Brief change log

  • Add ENABLE_UPSERT_MODE ConfigOption to DebeziumAvroFormatFactory
  • Update DebeziumAvroDeserializationSchema to handle the new option properly and skip emitting UPDATE_BEFORE when the option is enabled
  • Add test coverage to DebeziumAvroSerDeSchemaTest

Verifying this change

This change added tests and can be verified as follows:

  • Added testUpdateDataDeserializationWithUpsertMode that deserializes an update and checks no -U is emitted when upsert mode is enabled.
  • Added testSeDeSchemaWithUpsertMode to test the format factory flag functionality, and changelog mode is correctly set.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): /no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? docs

klam-shop avatar Feb 14 '25 20:02 klam-shop

CI report:

  • 7538c1b60c950d00d272b4dbcf6ee923336dca7e UNKNOWN
  • ded9393fcba8c23b9f4bd3d95a9cb537e0da7c5f Azure: SUCCESS
Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

flinkbot avatar Feb 14 '25 20:02 flinkbot

Thanks for your review @davidradl ! Updated s/Rows/rows/

klam-shop avatar Feb 20 '25 14:02 klam-shop

@AHeise any chance you have bandwidth to review this PR (since you reviewed similar https://github.com/apache/flink-connector-kafka/pull/109), or can help finding a committer to review?

klam-shop avatar Feb 20 '25 20:02 klam-shop

This PR is being marked as stale since it has not had any activity in the last 90 days. If you would like to keep this PR alive, please leave a comment asking for a review. If the PR has merge conflicts, update it with the latest from the base branch.

If you are having difficulty finding a reviewer, please reach out to the community, contact details can be found here: https://flink.apache.org/what-is-flink/community/

If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed.

github-actions[bot] avatar May 22 '25 06:05 github-actions[bot]

This PR has been closed since it has not had any activity in 120 days. If you feel like this was a mistake, or you would like to continue working on it, please feel free to re-open the PR and ask for a review.

github-actions[bot] avatar Jun 22 '25 06:06 github-actions[bot]