pulsar-client-python
pulsar-client-python copied to clipboard
On partitioned topics with the Python client, receiver_queue_size=0 results in an InvalidConfiguration error, but is documented to work
Describe the bug
Passing receiver_queue_size=0 to the Python client's subscribe method results in a InvalidConfiguration exception.
However, these docs indicate that a receiver queue size of 0 is supported.
To Reproduce
- With a connected Python client, call
subscribeon any partitioned topic with thereceiver_queue_sizekwarg set to0. - Observe that an
InvalidConfigurationerror is raised.
Expected behavior A value of 0 should either be supported and documented (is 0 equivalent to 1?), or these docs should be updated to reflect that the python client (if not others) do not support values less than 1.
Environment Same as https://github.com/apache/pulsar-client-python/issues/190
@zbentley It should be a feature catch-up, the Java client supported zero queue consumer but looks like the Python client does not support it. We will add zero queue consumer support for the Python client in 2.11.0.
I think zero queue size consumer has already been supported from a very early version. I've also fixed a related bug last year, see https://github.com/apache/pulsar/pull/10506.

It's better to show your Python client version and paste your code.
is 0 equivalent to 1?
No.
@BewareMyPower Do we have a test for the zero queue consumer of the Python client? If we have a test, please share the test in this issue.
@BewareMyPower
It's better to show your Python client version and paste your code.
See https://github.com/apache/pulsar-client-python/issues/190; that contains Python/client/etc. version.
The code I'm using is:
from pulsar import Client
import os
TOPIC = 'THETOPIC'
SUBSCRIPTION = 'SUBSCRIPTION'
def main():
client = Client(service_url='pulsar://localhost:6650')
client.subscribe(
topic=TOPIC,
subscription_name=SUBSCRIPTION,
receiver_queue_size=0,
consumer_name=f'testconsumer-{os.getpid()}'
)
if __name__ == '__main__':
main()
Could you clarify the difference between a setting of 0 and a setting of 1?
@codelipenghui See https://github.com/apache/pulsar/blob/defeec0e84a63ea865f3a2790bc61b66a02254c5/pulsar-client-cpp/python/pulsar_test.py#L289-L306
This test was introduced in https://github.com/apache/pulsar/pull/5706 that was included since Pulsar 2.5.0. It fixes https://github.com/apache/pulsar/issues/5634. (Python consumer does not accept receiver_queue_size=0)
@zbentley The common behavior between 0 and 1 receiver queue size is that the permits of FLOW request is 1.
However, if the receiver queue size is 0, only when receive is called will the consumer send the FLOW request to broker, which means the consumer won't prefetch the messages.
If the receiver queue size is 1, the consumer will send a FLOW request to broker immediately after the consumer is created successfully, which means the consumer will prefetch 1 message and cache it inside the consumer. It could affect the logic of the dispatcher in broker.
@BewareMyPower thanks, can that be added to the client documentation for receiver-queue related flags?
Update (and updated issue title): this only happens with partitioned topics. While I'm not quite sure what receiver queue size does on partitioned topics, I'm pretty sure it shouldn't start throwing errors for configurations that are valid for non-partitioned topics (unless receiver queue size is not honored at all for partitioned topics, in which case I'd expect it to always be an error to specify it).
It looks like zero queue consumer cannot be used on a partitioned topic. I tested the Java code as well.
var topic = "my-topic";
var admin = PulsarAdmin.builder().serviceHttpUrl("http://localhost:8080").build();
try {
admin.topics().createPartitionedTopic(topic, 1);
} catch (PulsarAdminException ignored) {
}
var client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
var consumer = client.newConsumer()
.topic(topic)
.subscriptionName("sub-xxx")
.receiverQueueSize(0)
.subscribe();
The error message is more friendly and clear:
IllegalArgumentException: Receiver queue size needs to be greater than 0 for Topics Consumer
I think it's a common problem for clients of all languages. This check was introduced in https://github.com/apache/pulsar/pull/1103.
MultiTopicsConsumerImpl(/* ... */) {
super(client, singleTopic, conf, Math.max(2, conf.getReceiverQueueSize()), executorProvider, subscribeFuture,
schema, interceptors);
checkArgument(conf.getReceiverQueueSize() > 0,
"Receiver queue size needs to be greater than 0 for Topics Consumer");
IMO, it's a bug. It looks like the multi topics consumer requires the receiver queue size is at least 2. (from the Math.max(2, conf.getReceiverQueueSize()) call). I guess in the early time of Pulsar, the number of partitions must be at least 2, so we have this check.
But the zero queue consumer should be an exceptional case. We should fix both Java and C++ clients for this config. @codelipenghui
@zbentley @BewareMyPower
It’s not enabled because we cannot know which partition the next message will be coming from. Any suggestion on how to achieve that?
from https://github.com/apache/pulsar/issues/7280#issuecomment-644238363 This is the main challenge to supporting zero queue consumers for a partitioned topic.
One option here is to create a function to merge messages from multiple topics/partitions to a non-partitioned topic,
the zero consumer only consumes the messages from the merged topic.
Got it. For zero queue consumer, there is no way to determine which internal consumer (on a specific partition) should be chosen to call receive.
While I don't know much about this area of the code, that sounds fine to me.
Other solutions that may make sense:
- Document that a zero queue size with a partitioned topic would potentially buffer up to N messages (where N is the number of partitions on the topic).
- Sequentially loop over the internal receive calls for each partition, returning when a message is found on any. The interactions with the timeout and missed deliveries might be undesirable here though.
The issue had no activity for 30 days, mark with Stale label.