pulsar-client-go
pulsar-client-go copied to clipboard
Zero queue consumer doesn't seem to work properly
Expected behavior
omitted
Actual behavior
Zero queue consumer has been supported since v0.13.0, but there seem to be some buggy behaviors.
- Immediately after creating a consumer, I got the following error log:
ERRO[0000] unable to send initial permits to broker consumerID=1 error="invalid number of permits requested: 0" name=dufbb subscription=sub1 topic="persistent://pulsar/test/t1"
- If I registered
MessageChannelwhen creating a consumer, itsavailablePermitswas 0. Naturally, even if messages were published to the topic, it was not able to receive any of them. - Consumer that receive messages using
Receive()rather thanMessageChannelworked. However, if the connected topic was unloaded or the broker was restarted,availablePermitsbecame 0 and no messages could be received thereafter.
Steps to reproduce
I ran the following code:
consumer_with_message_channel.go
func main() {
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
if err != nil {
log.Fatal(err)
}
defer client.Close()
channel := make(chan pulsar.ConsumerMessage)
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "persistent://pulsar/test/t1",
SubscriptionName: "sub1",
ReceiverQueueSize: 0,
EnableZeroQueueConsumer: true,
MessageChannel: channel,
})
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
for out := range channel {
msg := out.Message
fmt.Printf("Received: %s (%s)\n", string(msg.Payload()), msg.ID().String())
consumer.Ack(msg)
}
}
consumer_without_message_channel.go
func main() {
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
if err != nil {
log.Fatal(err)
}
defer client.Close()
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "persistent://pulsar/test/t1",
SubscriptionName: "sub1",
ReceiverQueueSize: 0,
EnableZeroQueueConsumer: true,
})
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
for i := 0; i < 100; i++ {
msg, err := consumer.Receive(context.Background())
if err == nil {
fmt.Printf("Received: %s (%s)\n", string(msg.Payload()), msg.ID().String())
consumer.Ack(msg)
} else {
log.Fatal(err)
}
}
}
System configuration
Pulsar version: v0.14.0