kafka-go icon indicating copy to clipboard operation
kafka-go copied to clipboard

#615 - allow specifying a listener for partition assignment changes

Open nachogiljaldo opened this issue 1 year ago • 7 comments

Closes https://github.com/segmentio/kafka-go/issues/615

Adds a listener to send re-assignments to a listener. After this, it is possible to possible to provide a listener (optional) to have consumers be notified of changes in partition assignments for the current consumer.

This is useful for debugging as well as potentially other things such as dropping in-flight events for partitions that are not owned anymore.

nachogiljaldo avatar Jul 02 '24 22:07 nachogiljaldo

@achille-roussel I saw you commented on https://github.com/segmentio/kafka-go/issues/615 . Is this something you could review?

nachogiljaldo avatar Jul 02 '24 22:07 nachogiljaldo

I don't think adding this is a good idea. Instead, you should probably use the ConsumerGroup API to listen for a new generation and construct a new reader.

isaacd9 avatar Jul 03 '24 05:07 isaacd9

Thanks for the feedback @isaacd9 ! Could you elaborate?

I had a quick look at the NewReader. Also it seems to use run which is private, so it's not possible to do it that way without duplicating even more code? Same for getTopics() while building the ConsumerGroupConfig.

Maybe, instead, a modification safe version of ConsumerGroup should be exposed (that only has Next but not Close) in the Reader? That allows to preserve the encapsulation of the logic to create Readers which IMHO seems important and avoids unnecessary duplication on all clients that would need this.

Something like:

type ConsumerGroupGenerationListener interface {
  Next(ctx context.Context) (*Generation, error)
}

...

func (r *Reader) ConsumerGroupGenerationListener(ctx context.Context) (ConsumerGroupGenerationListener, error) {
	if !r.useConsumerGroup() {
		return nil, errOnlyAvailableWithGroup
	}
        return r.consumerGroupGenerationListener, nil
}

nachogiljaldo avatar Jul 03 '24 06:07 nachogiljaldo

@petedannemann , sorry for the direct ping but you seem to have reviewed some of the last PRs. I wonder if this is something you could provide feedback on the approach (or the alternatives provided by Isaac or my counter proposal).

nachogiljaldo avatar Jul 04 '24 01:07 nachogiljaldo

heya @erikdw / @jkoelker apologies for the direct ping, but this has been laying around for a while and I would need some guidance (as you seem to have approved the last merged PRs)

If you're indeed maintainers of this library, are you ok with this approach? Would you prefer the one outlined in https://github.com/segmentio/kafka-go/pull/1305#issuecomment-2205206085 or a different one? Happy to take on those suggestions

nachogiljaldo avatar Sep 10 '24 10:09 nachogiljaldo

@nachogiljaldo I suspect this has sat for while as the commit messages and pr description don't give very much context as to why its necessary. I clicked through the issue and after reading the discussion there, I understand that there is a desire to notify other portions of the code when group reassignment takes place.

I'm not familiar with this code or kafka implementation details, so as to use a ConsumerGroup or this implementation, I can't say, and wouldn't feel comfortable reviewing this.

Code wise, I'm not sure sorting the list is necessary and would just delay the re-assignment if a listener doesn't need it sorted, if a listener needs it sorted, I'd assume it would sort it itself.

jkoelker avatar Sep 10 '24 15:09 jkoelker

@jkoelker thanks for the prompt reply!

Fair points, I updated the PR description and first commit message to be more informative, I hope that makes sense.

Also removed the sorting as you suggested.

As per the specifics of the implementation, since you said you're not familiar with the code or kafka, would you know who could help providing feedback on the approach / review this PR?

nachogiljaldo avatar Sep 11 '24 10:09 nachogiljaldo