kcctl
kcctl copied to clipboard
Add functionality for resetting connector offsets
See https://github.com/helpermethod/connect-offset-reset/ by @helpermethod.
This will require to expand the config context, allowing to specify Kafka cluster bootstrap URL and name of the offset topic.
@helpermethod, in case you're interested in collaborating, let me know and I can make you a committer to this repo.
Would love to!
Thinking more about this, it'd probably make most sense to expose this as a sub-command of the delete command: kcctl delete offsets <task-name>. WDYT?
Sounds like a good idea. Maybe I should quickly describe how the command is supposed to work:
- You pass in the list of broker URLs, the connector-name and the name of the connect offsets topic
- The command will then create a consumer, reads all partitions from the topic, and scans the keys for the connector name
- The keys is an array consisting of the connector name and some additional metadata, like ["connector-name", { ... }]
- If a key is found, a tombstone is sent to the correct partition (extracted from the record which contains the correct key)
It's basically a codified manifestation of @rmoff's excellent blog post: https://rmoff.net/2019/08/15/reset-kafka-connect-source-connector-offsets/ 😄
Gotcha!
You pass in the list of broker URLs, the connector-name and the name of the connect offsets topic
Most of this should happen by setting the context of the tool once:
kcctl config set-context --cluster=my-connect:8083 --bootstrap-servers=my-kafka:9092 --offset-topic=my-offsets
(We probably should also add --name parameter, allowing to switch between multiple named configurations).
And then:
kcctl delete offsets my-connector
Re the scope of that command, I'm not 100% sure from the top of my head. For a multi-task (source) connector, there'd be an offset for each source partition, right. It might make sense to be able to delete all or only one of them? Similar for sink connectors, I suppose, though I don't quite remember right now what their offset format is.
Hi @gunnarmorling!
Will probably take a week or two before I can start working on this.
The heavy rain here in NRW has flooded our basement and it will take a few days to recover from that 😕.
Hey @helpermethod, doh, I'm really sorry to hear about this. I hope you and your family are safe? Needless to say that recovering this situation should have priority, and I hope you'll be able to bring everything into a proper state soon. Appreciating it very much that you took the time to send this note. Take care!
For a multi-task (source) connector, there'd be an offset for each source partition, right. It might make sense to be able to delete all or only one of them?
Interesting! I've only used Kafka Connect with multiple workers, but never set max.tasks
with a value greater than 1
.
I'll need to test this out with multiple tasks.
A first step would be to delete all
task offsets. This is probably the most common use case (you've messed something up and need to re-consume stuff).
Hi @gunnarmorling,
regarding the Debezium documentation for removing committed offsets: https://debezium.io/documentation/faq/#how_to_remove_committed_offsets_for_a_connector
It only mentions the connector name, but says nothing about per-task offsets. I'll need to try this out this evening and see if it actually makes any difference.
I still wonder how the input stream partitioning works. I guess the offsets need to be committed somewhere...
It only mentions the connector name, but says nothing about per-task offsets.
Ah, yes good point. Most Debezium connectors currently support only a single task (the MongoDB one being the exception), hence we tend to be a bit lenient when talking about this and sometimes intermingle connectors and tasks. To make things clearer, here is the offset topic of the kcetcd connector, which supports multiple source tasks:
["test-connector",{"name":"etcd-a"}] {"revision":2}
["test-connector",{"name":"etcd-a"}] {"revision":3}
["test-connector",{"name":"etcd-c"}] {"revision":4}
["test-connector",{"name":"etcd-b"}] {"revision":2}
Taking a step back, what's relevant here is support for multiple partitions, and there's an entry for each source partition (which may be re-assigned between tasks dynamically). In Debezium, connectors always have a single source partition (except MongoDB), so this is why you haven't noticed this yet probably.
Interesting!
It should only take a few small changes to make the code work with multiple entries. I already scan all partitions.
I guess resetting offsets for only a subset of the tasks wouldn't make much sense, as the metadata for identifying individual task offsets would wary between different connectors?
I guess resetting offsets for only a subset of the tasks wouldn't make much sense, as the metadata for identifying individual task offsets would wary between different connectors?
+1; you'd have to know the connector-specific source partition id format; While we will expose this eventually via kcctl describe, I don't think a partition-specific reset is needed at this point. We always can add this later on.
Hi @gunnarmorling, I've got this working in kafka-connect-reset, which I currently use to prototype the functionality.
But now I face an interesting problem: if there are 3 offsets, I need to send 3 tombstones to the corresponding partitions.
Should I send those in parallel and wait for all of them to finish? Should this be done in a transaction?
Should I send those in parallel and wait for all of them to finish? Should this be done in a transaction?
That's an interesting question.
I don't think I'd bother with parallelizing this, doing it subsequentially should be good enough. I don't think a transaction is strictly needed, i.e. in case of a failure it seems acceptable if only a subset of the input partitions got reset. Using a transaction would be nice in theory, but IIRC, consumers need to read transactional topics with specific settings, and as this probably isn't expected for the offset topic, there probably is no advantage really of using TX for the reset.
Long story short: let's keep it simple :)
Just learned there's a KIP related to this. I believe it has stalled, but would be interesting to read it through and see whether we can take some inspiration from it.
Didn't know, very interesting!
Hi @gunnarmorling, I'm getting back to it this evening.
How is your evening going, @helpermethod? :-)
I've published a KIP that would add support for this to the Kafka Connect REST API; if anyone has the time to take a look, I'd be interested to hear people's thoughts: https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect
It looks like KIP-875 has landed - it looks like kcctl would now be in a position to add commands that use the new APIs for viewing/resetting/writing offsets? (and also stopping a connector)