ReceiverQueueSize of 0 doesn't work as intended
(This was already reported against the old driver here (by me): https://github.com/apache/pulsar/issues/6496 but it was noted this was fixed with the new go implementation, which doesn't seem the case. The case is identical.)
Expected behavior
When running pulsar as a queue system, with long/varying processing times per message, a client setting ReceiverQueueSize to 0 should allow a client to only request a new message when it's ready for new work.
When specifying a ReceiverQueueSize of 0, no messages should be pre-fetched, and the client should only request a message when it's ready with the previous message. This is also how it's documented at https://pulsar.apache.org/docs/en/cookbooks-message-queue/#client-configuration-changes.
I also tried a python implementation, and that actually works as it should be.
Actual behavior
Setting a ReceiverQueueSize of 0 sets the defaultQueueSize of 1000, which is not intended.
Steps to reproduce
Consumer and producer code below. Run 3 consumers: two at a 1 second sleeptime and one at a 10 second sleep. When injecting 10 messages, it should give you 1 message on the 10 second consumer, and 4 and 5 on both 1 second sleeptime consumers.
How I did it: Run a standalone docker pulsar, and open 4 terminal windows. In 2, go run consumer.go, in 1 go run consumer.go --sleeptime=10 and once they're all connected, run go run producer.go in the last window.
Simplified output:
Expected:
- consumer_10_seconds: consumes message 0
- consumer_1_second: consumes messages 1,3,5,7,9
- consumer_1_second: consumes messages 2,4,6,8 total runtime: 10 seconds
Actual:
- consumer_10_seconds: consumes message 0,5
- consumer_1_second: consumes messages 1,3,7,9
- consumer_1_second: consumes messages 2,4,6,8 total runtime: 20 seconds
System configuration
Pulsar version: 2.6.2 (docker, standalone) Latest pulsar-client-go driver (v0.3.0)
Things I already tried
I fiddled a bit in the pulsar-client-go, hoping to resolve this.
Made sure that it accepts 0 by changing the check here: https://github.com/apache/pulsar-client-go/blob/master/pulsar/consumer_impl.go#L94-L96, and made sure that the initialPermits was at least 1. This didn't yield the expected result.
Code:
Consumer:
package main
import (
"context"
"flag"
"fmt"
"log"
"time"
"github.com/apache/pulsar-client-go/pulsar"
)
func main() {
var sleeptime int
flag.IntVar(&sleeptime, "sleeptime", 1, "sleeptime")
flag.Parse()
client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
if err != nil {
log.Fatal(err)
}
defer client.Close()
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "testtopic",
SubscriptionName: "sharedtest",
Type: pulsar.Shared,
ReceiverQueueSize: 0,
})
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
for {
msg, err := consumer.Receive(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("Received message msgId: %s -- content: '%s' - will now sleep for %d seconds\n", msg.ID(), string(msg.Payload()), sleeptime)
time.Sleep(time.Duration(sleeptime) * time.Second)
consumer.Ack(msg)
}
}
Producer:
package main
import (
"context"
"fmt"
log "github.com/apache/pulsar/pulsar-client-go/logutil"
"github.com/apache/pulsar/pulsar-client-go/pulsar"
)
func main() {
fmt.Printf("Started...")
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
if err != nil {
log.Fatal(err)
}
fmt.Printf("got Client")
defer client.Close()
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "testtopic",
})
if err != nil {
log.Fatal(err)
}
defer producer.Close()
ctx := context.Background()
fmt.Printf("Ready for sending")
for i := 0; i < 10; i++ {
msgID, err := producer.SendAndGetMsgID(ctx, pulsar.ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
})
if err != nil {
log.Fatal(err)
}
fmt.Printf("The message Id value is: [%v] \n", msgID)
}
}
For reference: the python code that works:
import pulsar
import time
client = pulsar.Client('pulsar://localhost:6650')
consumer = client.subscribe('persistent://public/default/testtopic', 'sharedtest', consumer_type=pulsar.ConsumerType.Shared, receiver_queue_size=0)
while True:
msg = consumer.receive()
print(msg.data())
time.sleep(1)
consumer.acknowledge(msg)
client.close()
(change the sleeptime in one run to 10)
I managed to get it to work in my case, but I don't think my PR is complete ... anyhow, submitting as reference.
Hi, any update or timeline on this? We have to work with our own patched version of the driver for now, which is far from ideal ... This still seems like a basic functionality which isn't working as it should.
I think i will try to work for it when i have time,receive any talk about it.
Hi, I'd like to bring this ticket back to the attention. This keeps hurting simple queue-based processing of messages when there is a longer processing time per message combined with high parallelism. It's also a different behaviour compared to the Java and C-based drivers.