confluent-kafka-go icon indicating copy to clipboard operation
confluent-kafka-go copied to clipboard

When I suscribe to a topic which doesn't exists, the topic is not being created

Open martin-dos-santos opened this issue 3 years ago • 6 comments

Description

I'm creating a consumer which subscribes to a topic which doesn't exist, I'm expecting for it to create it automatically but it doesn't, instead I'm getting the following error: "Broker: Unknown topic or partition: Subscribed topic not available: my_topic: Broker: Unknown topic or partition.".

How to reproduce

This is the consumer.

sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

c, err := kafka.NewConsumer(&kafka.ConfigMap{
	"bootstrap.servers":        "localhost:9092",
	"group.id":                 fmt.Sprintf("my_service_%s", uuid.New().String()),
	"session.timeout.ms":       "6000",
	"auto.offset.reset":        "latest",
	"allow.auto.create.topics": "true",
})
if err != nil {
	_, _ = fmt.Fprintf(os.Stderr, "Consumer :: failed to create consumer: %s\n", err)
}

err = c.Subscribe("my_topic" nil)

isRunning := true
for isRunning {
	select {
	case sig := <-sigChan:
		fmt.Printf("Consumer :: caught signal %v: terminating\n", sig)
		isRunning = false
	default:
		ev := c.Poll(200000)
		if ev == nil {
			continue
		}

		switch e := ev.(type) {
		case *kafka.Message:
			fmt.Printf("Consumer :: message received: %s\n", string(e.Value))
		case kafka.Error:
			// Errors should generally be considered
			// informational, the client will try to
			// automatically recover.
			// But in this example we choose to terminate
			// the application if all brokers are down.
			_, _ = fmt.Fprintf(os.Stderr, "Error: %v: %v\n", e.Code(), e)
			if e.Code() == kafka.ErrAllBrokersDown {
				isRunning = false
			}
		}
	}
}

if err = c.Close(); err != nil {
	_, _ = fmt.Fprintf(os.Stderr, "Consumer :: error closing consumer: %s\n", err)
}
fmt.Println("Consumer :: Closed")

Just run that code, you'll get following error:

Broker: Unknown topic or partition: Subscribed topic not available: my_topic: Broker: Unknown topic or partition

Checklist

Please provide the following information: confluent-kafka-go: confluent-kafka-go v1.8.2 librdkafka version / LibraryVersion() 1.8.2-dirty Apache Kafka broker version: confluentinc/cp-kafka:latest Client configuration: kafka.ConfigMap{ "bootstrap.servers": "localhost:9092", "group.id": fmt.Sprintf("my_service_%s", uuid.New().String()), "session.timeout.ms": "6000", "auto.offset.reset": "latest", "allow.auto.create.topics": "true", } Operating system: Ubuntu 20.04.4 LTS Client logs: (with "debug": ".." as necessary) I don't know what value should I use in debug, I'll updated it if asked. Broker log excerpts: No logs produced. Critical issue: No.

martin-dos-santos avatar Jun 03 '22 15:06 martin-dos-santos

Hi @martin-dos-santos, thanks for asking. The Unknown topic or partition error is expected if the topic does not exist. Please create a topic before subscribing to it. This is the example for creating a topic: https://github.com/confluentinc/confluent-kafka-go/blob/master/examples/admin_create_topic/admin_create_topic.go

jliunyu avatar Jun 06 '22 18:06 jliunyu

Hi @martin-dos-santos, thanks for asking. The Unknown topic or partition error is expected if the topic does not exist. Please create a topic before subscribing to it. This is the example for creating a topic: https://github.com/confluentinc/confluent-kafka-go/blob/master/examples/admin_create_topic/admin_create_topic.go

Hi @jliunyu, thanks for your reply. Shouldn't the topic be created automatically given my settings? or is this no longer supported?

martin-dos-santos avatar Jun 06 '22 23:06 martin-dos-santos

Hi @martin-dos-santos - please see https://github.com/confluentinc/confluent-kafka-go/issues/615#issuecomment-806862457

jonchiu avatar Jun 07 '22 00:06 jonchiu

auto.create.topics.enable=true needs to also be set on the broker. (see the docs for allow.auto.create.topics in https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md )

mhowlett avatar Jun 07 '22 00:06 mhowlett

auto.create.topics.enable=true needs to also be set on the broker. (see the docs for allow.auto.create.topics in https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md )

It is. We have some some services using the java client and it is creating the topics automatically.

martin-dos-santos avatar Jun 07 '22 04:06 martin-dos-santos

I have auto.create.topics.enable=true on the broker and allow.auto.create.topics=true on the consumer.

I think the bug is actually on the fetcher. I could see that from the log where the fetcher is removed for partitions immediately.

[2022-06-09 08:43:10,304] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions Set(test_topic_1654764189837-0, test_topic_1654764189837-1) (kafka.server.ReplicaFetcherManager)

[2022-06-09 08:43:10,276] INFO Sent auto-creation request for Set(test_topic_1654764189837) to the active controller. (kafka.server.DefaultAutoTopicCreationManager) [2022-06-09 08:43:10,277] INFO [Controller 1] CreateTopics result(s): CreatableTopic(name='test_topic_1654764189837', numPartitions=2, replicationFactor=1, assignments=[], configs=[]): SUCCESS (org.apache.kafka.controller.ReplicationControlManager) [2022-06-09 08:43:10,277] INFO [Controller 1] Created topic test_topic_1654764189837 with topic ID zj7lYO0rR6WQAj_ir-YNFg. (org.apache.kafka.controller.ReplicationControlManager) [2022-06-09 08:43:10,277] INFO [Controller 1] Created partition test_topic_1654764189837-0 with topic ID zj7lYO0rR6WQAj_ir-YNFg and PartitionRegistration(replicas=[1], isr=[1], removingReplicas=[], addingReplicas=[], leader=1, leaderRecoveryState=RECOVERED, leaderEpoch=0, partitionEpoch=0). (org.apache.kafka.controller.ReplicationControlManager) [2022-06-09 08:43:10,278] INFO [Controller 1] Created partition test_topic_1654764189837-1 with topic ID zj7lYO0rR6WQAj_ir-YNFg and PartitionRegistration(replicas=[1], isr=[1], removingReplicas=[], addingReplicas=[], leader=1, leaderRecoveryState=RECOVERED, leaderEpoch=0, partitionEpoch=0). (org.apache.kafka.controller.ReplicationControlManager) [2022-06-09 08:43:10,304] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions Set(test_topic_1654764189837-0, test_topic_1654764189837-1) (kafka.server.ReplicaFetcherManager) [2022-06-09 08:43:10,307] INFO [LogLoader partition=test_topic_1654764189837-0, dir=/tmp/kraft-combined-logs] Loading producer state till offset 0 with message format version 2 (kafka.log.UnifiedLog$) [2022-06-09 08:43:10,308] INFO Created log for partition test_topic_1654764189837-0 in /tmp/kraft-combined-logs/test_topic_1654764189837-0 with properties {} (kafka.log.LogManager) [2022-06-09 08:43:10,309] INFO [Partition test_topic_1654764189837-0 broker=1] No checkpointed highwatermark is found for partition test_topic_1654764189837-0 (kafka.cluster.Partition) [2022-06-09 08:43:10,310] INFO [Partition test_topic_1654764189837-0 broker=1] Log loaded for partition test_topic_1654764189837-0 with initial high watermark 0 (kafka.cluster.Partition) [2022-06-09 08:43:10,316] INFO [LogLoader partition=test_topic_1654764189837-1, dir=/tmp/kraft-combined-logs] Loading producer state till offset 0 with message format version 2 (kafka.log.UnifiedLog$) [2022-06-09 08:43:10,317] INFO Created log for partition test_topic_1654764189837-1 in /tmp/kraft-combined-logs/test_topic_1654764189837-1 with properties {} (kafka.log.LogManager) [2022-06-09 08:43:10,317] INFO [Partition test_topic_1654764189837-1 broker=1] No checkpointed highwatermark is found for partition test_topic_1654764189837-1 (kafka.cluster.Partition) [2022-06-09 08:43:10,317] INFO [Partition test_topic_1654764189837-1 broker=1] Log loaded for partition test_topic_1654764189837-1 with initial high watermark 0 (kafka.cluster.Partition)

kwongpan avatar Jun 09 '22 08:06 kwongpan