confluent-kafka-go
confluent-kafka-go copied to clipboard
allow us to unsubscribe from a topic more easily
Description
Unsubscribing to 1 topic, but not all topics, for a consumer is currently wonky
I want this:
err := consumer.UnsubscribeFromTopic(topicId)
but all we have is:
err := consumer.Unsubscribe()
which unsubscribes us from all topics. this is insane?
How to reproduce
here is how to unsubscribe from one topic, but not all topics -
- we retrieve list of current topics
- remove the topic from the list
- then call the subscribe call again with the new list
package main
import (
"fmt"
"log"
"strings"
"time"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
consumer, err := kafka.NewConsumer(config)
topics := []string{"topic1", "topic2", "topic3"}
err = consumer.SubscribeTopics(topics, nil)
fmt.Printf("Subscribed to topics: %s\n", strings.Join(topics, ", "))
unsubscribeFromTopic := func(unsubTopic string) error {
metadata, err := consumer.GetMetadata(nil, true, 5000)
if err != nil {
return fmt.Errorf("Failed to get metadata: %s", err)
}
var remainingTopics []string
for _, topic := range topics {
if topic != unsubTopic {
if _, exists := metadata.Topics[topic]; exists {
remainingTopics = append(remainingTopics, topic)
}
}
}
if len(remainingTopics) == 0 {
return fmt.Errorf("No remaining topics to subscribe to")
}
err = consumer.SubscribeTopics(remainingTopics, nil)
fmt.Printf("Resubscribed to topics: %s\n", strings.Join(remainingTopics, ", "))
return nil
}
go func() {
for {
msg, err := consumer.ReadMessage(100 * time.Millisecond)
fmt.Printf("Consumed message from topic %s: \n", *msg.TopicPartition.Topic,)
}
}()
time.Sleep(5 * time.Second)
err = unsubscribeFromTopic("topic2")
time.Sleep(10 * time.Second)
err = consumer.Close()
fmt.Println("Consumer closed.")
}