confluent-kafka-go
confluent-kafka-go copied to clipboard
allow us to unsubscribe from a topic more easily
Description
Unsubscribing to 1 topic, but not all topics, for a consumer is currently wonky
I want this:
err := consumer.UnsubscribeFromTopic(topicId)
but all we have is:
err := consumer.Unsubscribe()
which unsubscribes us from all topics. this is insane?
How to reproduce
here is how to unsubscribe from one topic, but not all topics -
- we retrieve list of current topics
- remove the topic from the list
- then call the subscribe call again with the new list
package main
import (
"fmt"
"log"
"strings"
"time"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
consumer, err := kafka.NewConsumer(config)
topics := []string{"topic1", "topic2", "topic3"}
err = consumer.SubscribeTopics(topics, nil)
fmt.Printf("Subscribed to topics: %s\n", strings.Join(topics, ", "))
unsubscribeFromTopic := func(unsubTopic string) error {
metadata, err := consumer.GetMetadata(nil, true, 5000)
if err != nil {
return fmt.Errorf("Failed to get metadata: %s", err)
}
var remainingTopics []string
for _, topic := range topics {
if topic != unsubTopic {
if _, exists := metadata.Topics[topic]; exists {
remainingTopics = append(remainingTopics, topic)
}
}
}
if len(remainingTopics) == 0 {
return fmt.Errorf("No remaining topics to subscribe to")
}
err = consumer.SubscribeTopics(remainingTopics, nil)
fmt.Printf("Resubscribed to topics: %s\n", strings.Join(remainingTopics, ", "))
return nil
}
go func() {
for {
msg, err := consumer.ReadMessage(100 * time.Millisecond)
fmt.Printf("Consumed message from topic %s: \n", *msg.TopicPartition.Topic,)
}
}()
time.Sleep(5 * time.Second)
err = unsubscribeFromTopic("topic2")
time.Sleep(10 * time.Second)
err = consumer.Close()
fmt.Println("Consumer closed.")
}
Hey - while I don't think there is a plan to immediately add this enhancement right now, I can suggest a better workaround. You can use the Subscription() method to get a list of topics you're subscribed to, rather than the metadata call (this avoids a network call since Subscription is all local). The stuff with the slices still needs to be done, unfortunately.
ok thanks @milindl - I think this functionality is sorely needed - it would save users re-implementing their own unsubscribe logic. That means the library would store the references, which is fine I guess? Maybe only 10K items, tops.
we have a lot of topics in Kafka and passing 500 topic ids 3x a second to unsubscribe is a bit scary. Is there no way to store a reference to the sub list on the kafka-side? it seems like this is a big oversight?
I think the API is worth considering, but there are several things which means it will probably not be prioritized soon.
This is a public API, once we support it, we'll need to support it forever. And we'd need to define the semantics of the method for regex-based subscriptions, for which there is more than one way to implement it, and thus requires more discussion. A similar method doesn't exist in the official Kafka Java client either.
Added to that is the fact a workaround exists, and in my experience, changing subscriptions multiple times per second is not a very common mode of operation, since that incurs a rebalance (Even if you're using incremental rebalancing, there is some penalty to it).
Could you help me understand the use case for subscribing and unsubscribing from a topic multiple times every second? That'd be helpful for me to discuss this with my team.