confluent-kafka-go icon indicating copy to clipboard operation
confluent-kafka-go copied to clipboard

allow us to unsubscribe from a topic more easily

Open ORESoftware opened this issue 1 year ago • 4 comments
trafficstars

Description

Unsubscribing to 1 topic, but not all topics, for a consumer is currently wonky

I want this:

err := consumer.UnsubscribeFromTopic(topicId)

but all we have is:

err := consumer.Unsubscribe()

which unsubscribes us from all topics. this is insane?

How to reproduce

here is how to unsubscribe from one topic, but not all topics -

  1. we retrieve list of current topics
  2. remove the topic from the list
  3. then call the subscribe call again with the new list
package main

import (
	"fmt"
	"log"
	"strings"
	"time"

	"github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {

	consumer, err := kafka.NewConsumer(config)
	
	topics := []string{"topic1", "topic2", "topic3"}
	err = consumer.SubscribeTopics(topics, nil)

	fmt.Printf("Subscribed to topics: %s\n", strings.Join(topics, ", "))

	unsubscribeFromTopic := func(unsubTopic string) error {
		metadata, err := consumer.GetMetadata(nil, true, 5000)
		if err != nil {
			return fmt.Errorf("Failed to get metadata: %s", err)
		}

		var remainingTopics []string
		for _, topic := range topics {
			if topic != unsubTopic {
				if _, exists := metadata.Topics[topic]; exists {
					remainingTopics = append(remainingTopics, topic)
				}
			}
		}

		if len(remainingTopics) == 0 {
			return fmt.Errorf("No remaining topics to subscribe to")
		}

		err = consumer.SubscribeTopics(remainingTopics, nil)
		fmt.Printf("Resubscribed to topics: %s\n", strings.Join(remainingTopics, ", "))
		return nil
	}

	go func() {
		for {
			msg, err := consumer.ReadMessage(100 * time.Millisecond)
			fmt.Printf("Consumed message from topic %s: \n", *msg.TopicPartition.Topic,)
		}
	}()

	time.Sleep(5 * time.Second)

	err = unsubscribeFromTopic("topic2")

	time.Sleep(10 * time.Second)
	err = consumer.Close()
	fmt.Println("Consumer closed.")
}

ORESoftware avatar May 19 '24 05:05 ORESoftware

Hey - while I don't think there is a plan to immediately add this enhancement right now, I can suggest a better workaround. You can use the Subscription() method to get a list of topics you're subscribed to, rather than the metadata call (this avoids a network call since Subscription is all local). The stuff with the slices still needs to be done, unfortunately.

milindl avatar May 23 '24 12:05 milindl

ok thanks @milindl - I think this functionality is sorely needed - it would save users re-implementing their own unsubscribe logic. That means the library would store the references, which is fine I guess? Maybe only 10K items, tops.

ORESoftware avatar May 28 '24 04:05 ORESoftware

we have a lot of topics in Kafka and passing 500 topic ids 3x a second to unsubscribe is a bit scary. Is there no way to store a reference to the sub list on the kafka-side? it seems like this is a big oversight?

ORESoftware avatar Jun 04 '24 20:06 ORESoftware

I think the API is worth considering, but there are several things which means it will probably not be prioritized soon.

This is a public API, once we support it, we'll need to support it forever. And we'd need to define the semantics of the method for regex-based subscriptions, for which there is more than one way to implement it, and thus requires more discussion. A similar method doesn't exist in the official Kafka Java client either.

Added to that is the fact a workaround exists, and in my experience, changing subscriptions multiple times per second is not a very common mode of operation, since that incurs a rebalance (Even if you're using incremental rebalancing, there is some penalty to it).

Could you help me understand the use case for subscribing and unsubscribing from a topic multiple times every second? That'd be helpful for me to discuss this with my team.

milindl avatar Jun 06 '24 04:06 milindl