confluent-kafka-go
confluent-kafka-go copied to clipboard
When I suscribe to a topic which doesn't exists, the topic is not being created
Description
I'm creating a consumer which subscribes to a topic which doesn't exist, I'm expecting for it to create it automatically but it doesn't, instead I'm getting the following error: "Broker: Unknown topic or partition: Subscribed topic not available: my_topic: Broker: Unknown topic or partition.".
How to reproduce
This is the consumer.
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"group.id": fmt.Sprintf("my_service_%s", uuid.New().String()),
"session.timeout.ms": "6000",
"auto.offset.reset": "latest",
"allow.auto.create.topics": "true",
})
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "Consumer :: failed to create consumer: %s\n", err)
}
err = c.Subscribe("my_topic" nil)
isRunning := true
for isRunning {
select {
case sig := <-sigChan:
fmt.Printf("Consumer :: caught signal %v: terminating\n", sig)
isRunning = false
default:
ev := c.Poll(200000)
if ev == nil {
continue
}
switch e := ev.(type) {
case *kafka.Message:
fmt.Printf("Consumer :: message received: %s\n", string(e.Value))
case kafka.Error:
// Errors should generally be considered
// informational, the client will try to
// automatically recover.
// But in this example we choose to terminate
// the application if all brokers are down.
_, _ = fmt.Fprintf(os.Stderr, "Error: %v: %v\n", e.Code(), e)
if e.Code() == kafka.ErrAllBrokersDown {
isRunning = false
}
}
}
}
if err = c.Close(); err != nil {
_, _ = fmt.Fprintf(os.Stderr, "Consumer :: error closing consumer: %s\n", err)
}
fmt.Println("Consumer :: Closed")
Just run that code, you'll get following error:
Broker: Unknown topic or partition: Subscribed topic not available: my_topic: Broker: Unknown topic or partition
Checklist
Please provide the following information:
confluent-kafka-go: confluent-kafka-go v1.8.2
librdkafka version / LibraryVersion() 1.8.2-dirty
Apache Kafka broker version: confluentinc/cp-kafka:latest
Client configuration: kafka.ConfigMap{ "bootstrap.servers": "localhost:9092", "group.id": fmt.Sprintf("my_service_%s", uuid.New().String()), "session.timeout.ms": "6000", "auto.offset.reset": "latest", "allow.auto.create.topics": "true", }
Operating system: Ubuntu 20.04.4 LTS
Client logs: (with "debug": ".." as necessary) I don't know what value should I use in debug, I'll updated it if asked.
Broker log excerpts: No logs produced.
Critical issue: No.
Hi @martin-dos-santos, thanks for asking. The Unknown topic or partition error is expected if the topic does not exist. Please create a topic before subscribing to it. This is the example for creating a topic: https://github.com/confluentinc/confluent-kafka-go/blob/master/examples/admin_create_topic/admin_create_topic.go
Hi @martin-dos-santos, thanks for asking. The
Unknown topic or partition erroris expected if the topic does not exist. Please create a topic before subscribing to it. This is the example for creating a topic: https://github.com/confluentinc/confluent-kafka-go/blob/master/examples/admin_create_topic/admin_create_topic.go
Hi @jliunyu, thanks for your reply. Shouldn't the topic be created automatically given my settings? or is this no longer supported?
Hi @martin-dos-santos - please see https://github.com/confluentinc/confluent-kafka-go/issues/615#issuecomment-806862457
auto.create.topics.enable=true needs to also be set on the broker. (see the docs for allow.auto.create.topics in https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md )
auto.create.topics.enable=true needs to also be set on the broker. (see the docs for allow.auto.create.topics in https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md )
It is. We have some some services using the java client and it is creating the topics automatically.
I have auto.create.topics.enable=true on the broker and allow.auto.create.topics=true on the consumer.
I think the bug is actually on the fetcher. I could see that from the log where the fetcher is removed for partitions immediately.
[2022-06-09 08:43:10,304] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions Set(test_topic_1654764189837-0, test_topic_1654764189837-1) (kafka.server.ReplicaFetcherManager)
[2022-06-09 08:43:10,276] INFO Sent auto-creation request for Set(test_topic_1654764189837) to the active controller. (kafka.server.DefaultAutoTopicCreationManager) [2022-06-09 08:43:10,277] INFO [Controller 1] CreateTopics result(s): CreatableTopic(name='test_topic_1654764189837', numPartitions=2, replicationFactor=1, assignments=[], configs=[]): SUCCESS (org.apache.kafka.controller.ReplicationControlManager) [2022-06-09 08:43:10,277] INFO [Controller 1] Created topic test_topic_1654764189837 with topic ID zj7lYO0rR6WQAj_ir-YNFg. (org.apache.kafka.controller.ReplicationControlManager) [2022-06-09 08:43:10,277] INFO [Controller 1] Created partition test_topic_1654764189837-0 with topic ID zj7lYO0rR6WQAj_ir-YNFg and PartitionRegistration(replicas=[1], isr=[1], removingReplicas=[], addingReplicas=[], leader=1, leaderRecoveryState=RECOVERED, leaderEpoch=0, partitionEpoch=0). (org.apache.kafka.controller.ReplicationControlManager) [2022-06-09 08:43:10,278] INFO [Controller 1] Created partition test_topic_1654764189837-1 with topic ID zj7lYO0rR6WQAj_ir-YNFg and PartitionRegistration(replicas=[1], isr=[1], removingReplicas=[], addingReplicas=[], leader=1, leaderRecoveryState=RECOVERED, leaderEpoch=0, partitionEpoch=0). (org.apache.kafka.controller.ReplicationControlManager) [2022-06-09 08:43:10,304] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions Set(test_topic_1654764189837-0, test_topic_1654764189837-1) (kafka.server.ReplicaFetcherManager) [2022-06-09 08:43:10,307] INFO [LogLoader partition=test_topic_1654764189837-0, dir=/tmp/kraft-combined-logs] Loading producer state till offset 0 with message format version 2 (kafka.log.UnifiedLog$) [2022-06-09 08:43:10,308] INFO Created log for partition test_topic_1654764189837-0 in /tmp/kraft-combined-logs/test_topic_1654764189837-0 with properties {} (kafka.log.LogManager) [2022-06-09 08:43:10,309] INFO [Partition test_topic_1654764189837-0 broker=1] No checkpointed highwatermark is found for partition test_topic_1654764189837-0 (kafka.cluster.Partition) [2022-06-09 08:43:10,310] INFO [Partition test_topic_1654764189837-0 broker=1] Log loaded for partition test_topic_1654764189837-0 with initial high watermark 0 (kafka.cluster.Partition) [2022-06-09 08:43:10,316] INFO [LogLoader partition=test_topic_1654764189837-1, dir=/tmp/kraft-combined-logs] Loading producer state till offset 0 with message format version 2 (kafka.log.UnifiedLog$) [2022-06-09 08:43:10,317] INFO Created log for partition test_topic_1654764189837-1 in /tmp/kraft-combined-logs/test_topic_1654764189837-1 with properties {} (kafka.log.LogManager) [2022-06-09 08:43:10,317] INFO [Partition test_topic_1654764189837-1 broker=1] No checkpointed highwatermark is found for partition test_topic_1654764189837-1 (kafka.cluster.Partition) [2022-06-09 08:43:10,317] INFO [Partition test_topic_1654764189837-1 broker=1] Log loaded for partition test_topic_1654764189837-1 with initial high watermark 0 (kafka.cluster.Partition)