kcctl icon indicating copy to clipboard operation
kcctl copied to clipboard

Add functionality for resetting connector offsets

Open gunnarmorling opened this issue 3 years ago • 20 comments

See https://github.com/helpermethod/connect-offset-reset/ by @helpermethod.

This will require to expand the config context, allowing to specify Kafka cluster bootstrap URL and name of the offset topic.

gunnarmorling avatar Jul 13 '21 06:07 gunnarmorling

@helpermethod, in case you're interested in collaborating, let me know and I can make you a committer to this repo.

gunnarmorling avatar Jul 13 '21 07:07 gunnarmorling

Would love to!

helpermethod avatar Jul 13 '21 07:07 helpermethod

Thinking more about this, it'd probably make most sense to expose this as a sub-command of the delete command: kcctl delete offsets <task-name>. WDYT?

gunnarmorling avatar Jul 14 '21 07:07 gunnarmorling

Sounds like a good idea. Maybe I should quickly describe how the command is supposed to work:

  • You pass in the list of broker URLs, the connector-name and the name of the connect offsets topic
  • The command will then create a consumer, reads all partitions from the topic, and scans the keys for the connector name
  • The keys is an array consisting of the connector name and some additional metadata, like ["connector-name", { ... }]
  • If a key is found, a tombstone is sent to the correct partition (extracted from the record which contains the correct key)

It's basically a codified manifestation of @rmoff's excellent blog post: https://rmoff.net/2019/08/15/reset-kafka-connect-source-connector-offsets/ 😄

helpermethod avatar Jul 14 '21 07:07 helpermethod


You pass in the list of broker URLs, the connector-name and the name of the connect offsets topic

Most of this should happen by setting the context of the tool once:

kcctl config set-context --cluster=my-connect:8083 --bootstrap-servers=my-kafka:9092 --offset-topic=my-offsets

(We probably should also add --name parameter, allowing to switch between multiple named configurations).

And then:

kcctl delete offsets my-connector

Re the scope of that command, I'm not 100% sure from the top of my head. For a multi-task (source) connector, there'd be an offset for each source partition, right. It might make sense to be able to delete all or only one of them? Similar for sink connectors, I suppose, though I don't quite remember right now what their offset format is.

gunnarmorling avatar Jul 14 '21 07:07 gunnarmorling

Hi @gunnarmorling!

Will probably take a week or two before I can start working on this.

The heavy rain here in NRW has flooded our basement and it will take a few days to recover from that 😕.

helpermethod avatar Jul 18 '21 13:07 helpermethod

Hey @helpermethod, doh, I'm really sorry to hear about this. I hope you and your family are safe? Needless to say that recovering this situation should have priority, and I hope you'll be able to bring everything into a proper state soon. Appreciating it very much that you took the time to send this note. Take care!

gunnarmorling avatar Jul 18 '21 18:07 gunnarmorling

For a multi-task (source) connector, there'd be an offset for each source partition, right. It might make sense to be able to delete all or only one of them?

Interesting! I've only used Kafka Connect with multiple workers, but never set max.tasks with a value greater than 1. I'll need to test this out with multiple tasks.

A first step would be to delete all task offsets. This is probably the most common use case (you've messed something up and need to re-consume stuff).

helpermethod avatar Jul 21 '21 17:07 helpermethod

Hi @gunnarmorling,

regarding the Debezium documentation for removing committed offsets: https://debezium.io/documentation/faq/#how_to_remove_committed_offsets_for_a_connector

It only mentions the connector name, but says nothing about per-task offsets. I'll need to try this out this evening and see if it actually makes any difference.

I still wonder how the input stream partitioning works. I guess the offsets need to be committed somewhere...

helpermethod avatar Jul 22 '21 10:07 helpermethod

It only mentions the connector name, but says nothing about per-task offsets.

Ah, yes good point. Most Debezium connectors currently support only a single task (the MongoDB one being the exception), hence we tend to be a bit lenient when talking about this and sometimes intermingle connectors and tasks. To make things clearer, here is the offset topic of the kcetcd connector, which supports multiple source tasks:

["test-connector",{"name":"etcd-a"}]	{"revision":2}
["test-connector",{"name":"etcd-a"}]	{"revision":3}
["test-connector",{"name":"etcd-c"}]	{"revision":4}
["test-connector",{"name":"etcd-b"}]	{"revision":2}

Taking a step back, what's relevant here is support for multiple partitions, and there's an entry for each source partition (which may be re-assigned between tasks dynamically). In Debezium, connectors always have a single source partition (except MongoDB), so this is why you haven't noticed this yet probably.

gunnarmorling avatar Jul 23 '21 17:07 gunnarmorling


It should only take a few small changes to make the code work with multiple entries. I already scan all partitions.

I guess resetting offsets for only a subset of the tasks wouldn't make much sense, as the metadata for identifying individual task offsets would wary between different connectors?

helpermethod avatar Jul 24 '21 08:07 helpermethod

I guess resetting offsets for only a subset of the tasks wouldn't make much sense, as the metadata for identifying individual task offsets would wary between different connectors?

+1; you'd have to know the connector-specific source partition id format; While we will expose this eventually via kcctl describe, I don't think a partition-specific reset is needed at this point. We always can add this later on.

gunnarmorling avatar Jul 24 '21 09:07 gunnarmorling

Hi @gunnarmorling, I've got this working in kafka-connect-reset, which I currently use to prototype the functionality.

But now I face an interesting problem: if there are 3 offsets, I need to send 3 tombstones to the corresponding partitions.

Should I send those in parallel and wait for all of them to finish? Should this be done in a transaction?

helpermethod avatar Jul 27 '21 04:07 helpermethod

Should I send those in parallel and wait for all of them to finish? Should this be done in a transaction?

That's an interesting question.

I don't think I'd bother with parallelizing this, doing it subsequentially should be good enough. I don't think a transaction is strictly needed, i.e. in case of a failure it seems acceptable if only a subset of the input partitions got reset. Using a transaction would be nice in theory, but IIRC, consumers need to read transactional topics with specific settings, and as this probably isn't expected for the offset topic, there probably is no advantage really of using TX for the reset.

Long story short: let's keep it simple :)

gunnarmorling avatar Jul 27 '21 06:07 gunnarmorling

Just learned there's a KIP related to this. I believe it has stalled, but would be interesting to read it through and see whether we can take some inspiration from it.

gunnarmorling avatar Aug 17 '21 09:08 gunnarmorling

Didn't know, very interesting!

helpermethod avatar Aug 17 '21 10:08 helpermethod

Hi @gunnarmorling, I'm getting back to it this evening.

helpermethod avatar Sep 01 '21 06:09 helpermethod

How is your evening going, @helpermethod? :-)

gunnarmorling avatar Oct 10 '21 09:10 gunnarmorling

I've published a KIP that would add support for this to the Kafka Connect REST API; if anyone has the time to take a look, I'd be interested to hear people's thoughts: https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect

C0urante avatar Oct 13 '22 20:10 C0urante

It looks like KIP-875 has landed - it looks like kcctl would now be in a position to add commands that use the new APIs for viewing/resetting/writing offsets? (and also stopping a connector)