confluent-kafka-go icon indicating copy to clipboard operation
confluent-kafka-go copied to clipboard

allow us to unsubscribe from a topic more easily

Open ORESoftware opened this issue 9 months ago • 4 comments

Description

Unsubscribing to 1 topic, but not all topics, for a consumer is currently wonky

I want this:

err := consumer.UnsubscribeFromTopic(topicId)

but all we have is:

err := consumer.Unsubscribe()

which unsubscribes us from all topics. this is insane?

How to reproduce

here is how to unsubscribe from one topic, but not all topics -

  1. we retrieve list of current topics
  2. remove the topic from the list
  3. then call the subscribe call again with the new list
package main

import (
	"fmt"
	"log"
	"strings"
	"time"

	"github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {

	consumer, err := kafka.NewConsumer(config)
	
	topics := []string{"topic1", "topic2", "topic3"}
	err = consumer.SubscribeTopics(topics, nil)

	fmt.Printf("Subscribed to topics: %s\n", strings.Join(topics, ", "))

	unsubscribeFromTopic := func(unsubTopic string) error {
		metadata, err := consumer.GetMetadata(nil, true, 5000)
		if err != nil {
			return fmt.Errorf("Failed to get metadata: %s", err)
		}

		var remainingTopics []string
		for _, topic := range topics {
			if topic != unsubTopic {
				if _, exists := metadata.Topics[topic]; exists {
					remainingTopics = append(remainingTopics, topic)
				}
			}
		}

		if len(remainingTopics) == 0 {
			return fmt.Errorf("No remaining topics to subscribe to")
		}

		err = consumer.SubscribeTopics(remainingTopics, nil)
		fmt.Printf("Resubscribed to topics: %s\n", strings.Join(remainingTopics, ", "))
		return nil
	}

	go func() {
		for {
			msg, err := consumer.ReadMessage(100 * time.Millisecond)
			fmt.Printf("Consumed message from topic %s: \n", *msg.TopicPartition.Topic,)
		}
	}()

	time.Sleep(5 * time.Second)

	err = unsubscribeFromTopic("topic2")

	time.Sleep(10 * time.Second)
	err = consumer.Close()
	fmt.Println("Consumer closed.")
}

ORESoftware avatar May 19 '24 05:05 ORESoftware