pulsar-client-go icon indicating copy to clipboard operation
pulsar-client-go copied to clipboard

ReceiverQueueSize of 0 doesn't work as intended

Open wonko opened this issue 5 years ago • 6 comments

(This was already reported against the old driver here (by me): https://github.com/apache/pulsar/issues/6496 but it was noted this was fixed with the new go implementation, which doesn't seem the case. The case is identical.)

Expected behavior

When running pulsar as a queue system, with long/varying processing times per message, a client setting ReceiverQueueSize to 0 should allow a client to only request a new message when it's ready for new work.

When specifying a ReceiverQueueSize of 0, no messages should be pre-fetched, and the client should only request a message when it's ready with the previous message. This is also how it's documented at https://pulsar.apache.org/docs/en/cookbooks-message-queue/#client-configuration-changes.

I also tried a python implementation, and that actually works as it should be.

Actual behavior

Setting a ReceiverQueueSize of 0 sets the defaultQueueSize of 1000, which is not intended.

Steps to reproduce

Consumer and producer code below. Run 3 consumers: two at a 1 second sleeptime and one at a 10 second sleep. When injecting 10 messages, it should give you 1 message on the 10 second consumer, and 4 and 5 on both 1 second sleeptime consumers.

How I did it: Run a standalone docker pulsar, and open 4 terminal windows. In 2, go run consumer.go, in 1 go run consumer.go --sleeptime=10 and once they're all connected, run go run producer.go in the last window.

Simplified output:

Expected:

  • consumer_10_seconds: consumes message 0
  • consumer_1_second: consumes messages 1,3,5,7,9
  • consumer_1_second: consumes messages 2,4,6,8 total runtime: 10 seconds

Actual:

  • consumer_10_seconds: consumes message 0,5
  • consumer_1_second: consumes messages 1,3,7,9
  • consumer_1_second: consumes messages 2,4,6,8 total runtime: 20 seconds

System configuration

Pulsar version: 2.6.2 (docker, standalone) Latest pulsar-client-go driver (v0.3.0)

Things I already tried

I fiddled a bit in the pulsar-client-go, hoping to resolve this.

Made sure that it accepts 0 by changing the check here: https://github.com/apache/pulsar-client-go/blob/master/pulsar/consumer_impl.go#L94-L96, and made sure that the initialPermits was at least 1. This didn't yield the expected result.

Code:

Consumer:

package main

import (
	"context"
	"flag"
	"fmt"
	"log"
	"time"

	"github.com/apache/pulsar-client-go/pulsar"
)

func main() {
	var sleeptime int

	flag.IntVar(&sleeptime, "sleeptime", 1, "sleeptime")
	flag.Parse()

	client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
	if err != nil {
		log.Fatal(err)
	}

	defer client.Close()

	consumer, err := client.Subscribe(pulsar.ConsumerOptions{
		Topic:             "testtopic",
		SubscriptionName:  "sharedtest",
		Type:              pulsar.Shared,
		ReceiverQueueSize: 0,
	})
	if err != nil {
		log.Fatal(err)
	}

	defer consumer.Close()

	for {
		msg, err := consumer.Receive(context.Background())
		if err != nil {
			log.Fatal(err)
		}

		fmt.Printf("Received message  msgId: %s -- content: '%s' - will now sleep for %d seconds\n", msg.ID(), string(msg.Payload()), sleeptime)

		time.Sleep(time.Duration(sleeptime) * time.Second)
		consumer.Ack(msg)
	}
}

Producer:

package main

import (
	"context"
	"fmt"

	log "github.com/apache/pulsar/pulsar-client-go/logutil"
	"github.com/apache/pulsar/pulsar-client-go/pulsar"
)

func main() {
	fmt.Printf("Started...")
	client, err := pulsar.NewClient(pulsar.ClientOptions{
		URL: "pulsar://localhost:6650",
	})

	if err != nil {
		log.Fatal(err)
	}

	fmt.Printf("got Client")

	defer client.Close()

	producer, err := client.CreateProducer(pulsar.ProducerOptions{
		Topic: "testtopic",
	})
	if err != nil {
		log.Fatal(err)
	}

	defer producer.Close()

	ctx := context.Background()

	fmt.Printf("Ready for sending")
	for i := 0; i < 10; i++ {
		msgID, err := producer.SendAndGetMsgID(ctx, pulsar.ProducerMessage{
			Payload: []byte(fmt.Sprintf("hello-%d", i)),
		})

		if err != nil {
			log.Fatal(err)
		}
		fmt.Printf("The message Id value is: [%v] \n", msgID)
	}
}

For reference: the python code that works:

import pulsar
import time

client = pulsar.Client('pulsar://localhost:6650')
consumer = client.subscribe('persistent://public/default/testtopic', 'sharedtest', consumer_type=pulsar.ConsumerType.Shared, receiver_queue_size=0)
while True:
    msg = consumer.receive()
    print(msg.data())
    time.sleep(1)
    consumer.acknowledge(msg)
client.close()

(change the sleeptime in one run to 10)

wonko avatar Nov 20 '20 11:11 wonko

I managed to get it to work in my case, but I don't think my PR is complete ... anyhow, submitting as reference.

wonko avatar Nov 20 '20 17:11 wonko

Hi, any update or timeline on this? We have to work with our own patched version of the driver for now, which is far from ideal ... This still seems like a basic functionality which isn't working as it should.

wonko avatar Jun 14 '21 12:06 wonko

I think i will try to work for it when i have time,receive any talk about it.

liangyuanpeng avatar Aug 20 '21 16:08 liangyuanpeng

Hi, I'd like to bring this ticket back to the attention. This keeps hurting simple queue-based processing of messages when there is a longer processing time per message combined with high parallelism. It's also a different behaviour compared to the Java and C-based drivers.

wonko avatar Jan 31 '22 10:01 wonko