pulsar-client-python
pulsar-client-python copied to clipboard
max_total_receiver_queue_size_across_partitions does not work
Describe the bug
The max_total_receiver_queue_size_across_partitions kwarg to the Python subscribe method is nonfunctional.
To Reproduce
- Create a persistent partitioned topic. I used 4 partitions.
- Create a
Sharedsubscription on the topic. - Publish 10 messages to the topic using
batching_type=BatchingType.KeyBasedand a unique partition key for each message (this is not necessary with a Shared subscription, but is necessary to demonstrate that this bug also affects KeyShared subscriptions). - Create a consumer on the topic with the below code, and ensure it prints
Got message, sleeping forever. - In a second terminal, start another consumer on the topic with the below code.
- Observe that the second consumer does not get a message.
- Publish additional messages to the topic.
- Observe that only after the second publish step does the consumer get messages.
Consumer code:
import time
from pulsar import Client, ConsumerType, Timeout
import os
TOPIC = 'THETOPIC'
SUBSCRIPTION = 'THESUBSCRIPTION'
def main():
client = Client(service_url='pulsar://localhost:6650')
sub = client.subscribe(
topic=TOPIC,
subscription_name=SUBSCRIPTION,
consumer_type=ConsumerType.Shared,
max_total_receiver_queue_size_across_partitions=1,
consumer_name=f'testconsumer-{os.getpid()}'
)
while True:
try:
msg = sub.receive(100)
mid = msg.message_id()
print("partition:", mid.partition(), "ledger:", mid.ledger_id(), "entry:", mid.entry_id(), "batch:", mid.batch_index())
break
except Timeout:
pass
print("Got message, sleeping forever")
while True:
time.sleep(1)
if __name__ == '__main__':
main()
Expected behavior
The second consumer should receive messages from the topic immediately upon startup. The first consumer should only prevent the second consumer from getting max_total_receiver_queue_size_across_partitions messages.
I'm not sure what setting max_total_receiver_queue_size_across_partitions to 0 should do; that's not documented, and probably should be; these docs indicate that it should behave equivalent to a value of 1 with regards to other consumers' ability to get messages.
I'm not sure what the interaction is between receiver_queue_size and max_total_receiver_queue_size_across_partitions; that should be documented as well, but as part of https://github.com/apache/pulsar/issues/15702.
Additional context
After around ~320 messages in the backlog (given my message size), the second consumer will get data when it starts. I don't know why that cutoff exists.
Environment:
(chariot) zac.bentley@ZacBentleyMBP ~/Desktop/Projects/Klaviyo/chariot ∴ arch
i386
(chariot) zac.bentley@ZacBentleyMBP ~/Desktop/Projects/Klaviyo/chariot ∴ sw_vers
ProductName: macOS
ProductVersion: 12.3.1
BuildVersion: 21E258
(chariot) zac.bentley@ZacBentleyMBP ~/Desktop/Projects/Klaviyo/chariot ∴ brew info apache-pulsar
apache-pulsar: stable 2.10.0 (bottled), HEAD
Cloud-native distributed messaging and streaming platform
https://pulsar.apache.org/
/usr/local/Cellar/apache-pulsar/2.10.0 (1,018 files, 949.7MB) *
Poured from bottle on 2022-05-13 at 12:10:54
From: https://github.com/Homebrew/homebrew-core/blob/HEAD/Formula/apache-pulsar.rb
License: Apache-2.0
(chariot) zac.bentley@ZacBentleyMBP ~/Desktop/Projects/Klaviyo/chariot ∴ python --version
Python 3.7.13
(chariot) zac.bentley@ZacBentleyMBP ~/Desktop/Projects/Klaviyo/chariot ∴ pip show pulsar-client
Name: pulsar-client
Version: 2.10.0
Summary: Apache Pulsar Python client library
Home-page: https://pulsar.apache.org/
Author: Pulsar Devs
Author-email: [email protected]
License: Apache License v2.0
Location: /Users/zac.bentley/Desktop/Projects/Klaviyo/chariot/.venv/lib/python3.7/site-packages
Requires: certifi, six
@zbentley It should be related to the producer batch, the broker dispatches the whole batch to a consumer, not a message. For your test case, the producer might add 10 messages into 1 batch, so all the 10 messages will dispatch to one consumer. The other consumer will not receive any messages.
You can try to disable the message batch on the producer side.
@codelipenghui this issue occurs with and without batching_enabled in the producer. Additionally, with batching_enabled=True it occurs regardless of which batching strategy is used.
I edited the code in the example to print out ledger/entry/partition/batch so that it is evident that the batch index is -1 for messages received.
@zbentley
In a second terminal, start another consumer on the topic with the below code.
Does the second consumer use the same subscription name as the first consumer?
@codelipenghui no, see example code:
consumer_name=f'testconsumer-{os.getpid()}'
The issue had no activity for 30 days, mark with Stale label.
@codelipenghui no, see example code:
consumer_name=f'testconsumer-{os.getpid()}'
This is for the consumer name. Not the subscription name.
max_total_receiver_queue_size_across_partitions only works for the partitioned topic. The root cause is that the max_total_receiver_queue_size_across_partitions only controls the receiver queue size of the sub consumers instead of the parent consumer(MultiTopicConsumer). The default receiver queue size of the parent consumer is still 1000.
As a workaround, you can adjust the receiver_queue_size for the consumer to match the max_total_receiver_queue_size_across_partitions.
Here is an example for your case:
sub = client.subscribe(
topic=TOPIC,
subscription_name=SUBSCRIPTION,
consumer_type=ConsumerType.Shared,
max_total_receiver_queue_size_across_partitions=1,
consumer_name=f'testconsumer-{os.getpid()}',
receiver_queue_size=1
)
This behavior is same with the Java client.
For further improvements:
-
There is a lack of testing for the
max_total_receiver_queue_size_across_partitions. We need to enrich the test cases. -
max_total_receiver_queue_size_across_partitionsonly works for the multiple topics consumer. It's better to print a warn log when the users use the multiple topics consumer withmax_total_receiver_queue_size_across_partitions. -
When setting the
max_total_receiver_queue_size_across_partitions, the user also needs to consider thereceiver_queue_sizefor the consumer. It's not user-friendly. We may need to find a way to improve it.
@codelipenghui no, see example code:
consumer_name=f'testconsumer-{os.getpid()}'This is for the consumer name. Not the subscription name.
max_total_receiver_queue_size_across_partitionsonly works for the partitioned topic. The root cause is that themax_total_receiver_queue_size_across_partitionsonly controls the receiver queue size of the sub consumers instead of the parent consumer(MultiTopicConsumer). The default receiver queue size of the parent consumer is still 1000. As a workaround, you can adjust thereceiver_queue_sizefor the consumer to match themax_total_receiver_queue_size_across_partitions.Here is an example for your case:
sub = client.subscribe( topic=TOPIC, subscription_name=SUBSCRIPTION, consumer_type=ConsumerType.Shared, max_total_receiver_queue_size_across_partitions=1, consumer_name=f'testconsumer-{os.getpid()}', receiver_queue_size=1 )This behavior is same with the Java client.
For further improvements:
- There is a lack of testing for the
max_total_receiver_queue_size_across_partitions. We need to enrich the test cases.max_total_receiver_queue_size_across_partitionsonly works for the multiple topics consumer. It's better to print a warn log when the users use the multiple topics consumer withmax_total_receiver_queue_size_across_partitions.- When setting the
max_total_receiver_queue_size_across_partitions, the user also needs to consider thereceiver_queue_sizefor the consumer. It's not user-friendly. We may need to find a way to improve it.
I've adjusted the test code by adding receiver_queue_size= and reproduced the same issue. Creating a backlog of 100 messages on a 4 partitioned topic, type Shared, only the first consumer gets a single message. After it disconnects, the second consumer gets a message.
Tested on pulsar-client-3.4.0 and it works as expected. This appears to be a bug in the version we are using, pulsar-client-2.10.1
Tested on pulsar-client-3.4.0 and it works as expected. This appears to be a bug in the version we are using, pulsar-client-2.10.1
Thanks for your information. I just tested the workaround and it's not working on pulsar-client-2.10. Yes. There might be some bugs on pulsar-client-2.10 but they have been fixed after 3.0. The workaround should only be working >= 3.0
After investigation, the issue is related to the PartitionedConsumerImpl from the Pulsar CPP client side, which has been refactored by this PR:https://github.com/apache/pulsar/pull/16969.