kafka-connect-storage-cloud icon indicating copy to clipboard operation
kafka-connect-storage-cloud copied to clipboard

Introduces the concept for source topic/partition in the TopicPartitionWritter

Open fmeyer opened this issue 3 years ago • 3 comments

Problem

Whenever the destination topic is modified by any transformer, we loose the ability to pause the original source topic, therefore it also checks against isTransformedTopic when pausing/resuming topics during writes/flushes using the record's original topic/partition.

Solution

This should fix any transform NPE that happens when a transform that modifies the topic name is present

Introduces the concept for source topic in the TopicPartitionWritter builded from the information contained in the InternalSinkRecord.

Does this solution apply anywhere else?
  • [ ] yes
  • [x] no
If yes, where?

Test Strategy

Testing done:
  • [x] Unit tests
  • [x] Integration tests
  • [ ] System tests
  • [x] Manual tests

Release Plan

fmeyer avatar Jan 13 '22 14:01 fmeyer

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.

CLAassistant avatar Apr 09 '22 14:04 CLAassistant

The inability to use transformations that change the underlying topic name is amongst the limitations on the S3 connectors. The limitations are there because, in addition to proper functioning of the pause/resume functions, the worker relies on the offsets sent back by the connector to ensure delivery guarantees. If the topic name are changed the offsets returned may not be the right offset (with the correct topic name) to be committed.

snehashisp avatar Apr 19 '22 13:04 snehashisp

@snehashisp Thanks for the explanation. What's the likelihood of this PR being accepted?

btiernay avatar Jun 20 '22 13:06 btiernay