EnableZeroQueueConsumer Setting Not Functioning Correctly
Description I've encountered an issue with the Pulsar client where setting EnableZeroQueueConsumer: true does not produce the expected behavior. The client continues to perform extra reads into the buffer, which contradicts the purpose of this setting. Steps to Reproduce
Configure the Pulsar client with EnableZeroQueueConsumer: true Observe the client's behavior during message consumption
Expected Behavior When EnableZeroQueueConsumer is set to true, the client should not perform extra reads into the buffer. It should only fetch messages as they are consumed. Actual Behavior The client continues to read additional messages into the buffer, despite the EnableZeroQueueConsumer setting being enabled. Additional Information I've reviewed the code and I'm confident there's an issue in the implementation of this feature. The extra buffering occurs consistently, regardless of the EnableZeroQueueConsumer setting. Environment
Pulsar Client Version: v0.13.1
Server : [apachepulsar/pulsar:3.3.1] (docker)
Any assistance in investigating and resolving this issue would be greatly appreciated. Thank you for your time and attention to this matter.
When EnableZeroQueueConsumer is set to true, the client should not perform extra reads into the buffer. It should only fetch messages as they are consumed.
Could you please provide a more detailed explanation of the meaning of this sentence?
https://github.com/apache/pulsar-client-go/blob/682bf5fde149754c1648fbffccd39c2c51f17551/pulsar/consumer_partition.go#L1492
In the current situation, all messages are first distributed to the messageCh, and then data is retrieved from this channel.
The expected behavior with EnableZeroQueueConsumer set to true should be as follows:
50 tasks were placed in the queue. Each consumer was limited to processing 5 tasks concurrently. 4 consumers were opened sequentially. The distribution should have been: Consumer 1: Tasks 1-5 Consumer 2: Tasks 6-10 Consumer 3: Tasks 11-15 Consumer 4: Tasks 16-20 However, the actual behavior observed was:
Consumer 1: Processed tasks 1-5, but incorrectly buffered tasks 6-20 Consumer 2: Processed tasks 21-25, but incorrectly buffered tasks 26-40 (Similar pattern for other consumers) When Consumer 1 was closed, it released all its tasks, including those in the buffer, which Consumer 4 then took over.
This behavior contradicts the purpose of EnableZeroQueueConsumer. With this setting enabled, consumers should only fetch tasks as they're ready to process them, without pre-fetching or buffering additional tasks. The current implementation appears to be ignoring this setting, leading to unnecessary buffering and potential issues with task distribution and processing.
test code :
func consumer3() {
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
Logger: plog.NewLoggerWithLogrus(&logrus.Logger{}),
})
if err != nil {
log.Fatal(err)
}
defer client.Close()
schema := pulsar.NewAvroSchema(common.AvroSchema, nil)
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "test1",
SubscriptionName: "my-shared-subscription",
Type: pulsar.Shared,
Schema: schema,
SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest,
EnableZeroQueueConsumer: true,
MaxPendingChunkedMessage: 1,
})
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
accountWorker := common.NewAccountListener(5)
fmt.Println("waiting..")
for accountWorker.WaitReady() {
msg, err := consumer.Receive(context.Background())
if err != nil {
log.Printf("Error receiving message: %v", err)
continue
}
account := &common.Account{}
err = msg.GetSchemaValue(account)
if err != nil {
log.Printf("Error decoding message: %v", err)
consumer.Nack(msg)
continue
}
fmt.Println("Consumed : " + strconv.Itoa(account.ID))
accountWorker.AddAccount(consumer, msg, account)
}
}
const AvroSchema = `
{
"type": "record",
"name": "Account",
"fields": [
{"name": "ID", "type": "int"}
]
}`
type AccountWrapper struct {
Account *Account
Consumer pulsar.Consumer
Message pulsar.Message
}
type Account struct {
ID int `avro:"ID"`
}
type AccountListener struct {
Limit int
Accounts map[int]*AccountWrapper
lock *sync.Mutex
}
func NewAccountListener(limit int) *AccountListener {
return &AccountListener{
Limit: limit,
Accounts: make(map[int]*AccountWrapper),
lock: &sync.Mutex{},
}
}
func (l *AccountListener) WaitReady() bool {
for l.checkLimit() == false {
time.Sleep(time.Millisecond * 100)
}
return true
}
func (l *AccountListener) IsReady() bool {
l.lock.Lock()
defer l.lock.Unlock()
return len(l.Accounts) < l.Limit
}
func (l *AccountListener) checkLimit() bool {
l.lock.Lock()
defer l.lock.Unlock()
return len(l.Accounts) < l.Limit
}
func (l *AccountListener) AddAccount(consumer pulsar.Consumer, message pulsar.Message, account *Account) {
l.lock.Lock()
defer l.lock.Unlock()
l.Accounts[account.ID] = &AccountWrapper{
Account: account,
Message: message,
Consumer: consumer,
}
go l.Process(l.Accounts[account.ID])
}
func (l *AccountListener) RemoveAccount(account *Account) {
l.lock.Lock()
defer l.lock.Unlock()
delete(l.Accounts, account.ID)
}
func (l *AccountListener) Process(wrapper *AccountWrapper) {
// long process simulate
time.Sleep(time.Minute * 5)
// finish
fmt.Println("< : " + strconv.Itoa(wrapper.Account.ID))
wrapper.Consumer.Ack(wrapper.Message) // success - remove queue
l.RemoveAccount(wrapper.Account)
}
I've found a temporary solution to my issue by modifying the following line: https://github.com/apache/pulsar-client-go/blob/953d9eab07948d94234c6a2e6b04f1ffeb8ff833/pulsar/consumer_partition.go#L1497 The modified code now looks like this:
if pc.options.receiverQueueSize > 0 {
pc.availablePermits.inc()
}
This change appears to address the problem I was experiencing. However, I want to emphasize that this is a temporary fix, and I'm not certain about its broader implications or whether it's the most appropriate long-term solution. I would greatly appreciate if the maintainers or someone with deeper knowledge of the codebase could review this modification and provide feedback on:
Whether this approach aligns with the intended behavior of EnableZeroQueueConsumer. Potential side effects or issues this change might introduce in other scenarios. If there's a more robust or recommended way to achieve the desired behavior.
@gkhnoztrk
Thank you for the clarification; this was something that wasn't considered before.
This modification is fine; receiverQueueSize will only be 0 when using zeroQueueConsumer.