pulsar-client-python icon indicating copy to clipboard operation
pulsar-client-python copied to clipboard

On partitioned topics with the Python client, receiver_queue_size=0 results in an InvalidConfiguration error, but is documented to work

Open zbentley opened this issue 3 years ago • 16 comments
trafficstars

Describe the bug Passing receiver_queue_size=0 to the Python client's subscribe method results in a InvalidConfiguration exception.

However, these docs indicate that a receiver queue size of 0 is supported.

To Reproduce

  1. With a connected Python client, call subscribe on any partitioned topic with the receiver_queue_size kwarg set to 0.
  2. Observe that an InvalidConfiguration error is raised.

Expected behavior A value of 0 should either be supported and documented (is 0 equivalent to 1?), or these docs should be updated to reflect that the python client (if not others) do not support values less than 1.

Environment Same as https://github.com/apache/pulsar-client-python/issues/190

zbentley avatar May 21 '22 16:05 zbentley

@zbentley It should be a feature catch-up, the Java client supported zero queue consumer but looks like the Python client does not support it. We will add zero queue consumer support for the Python client in 2.11.0.

codelipenghui avatar May 22 '22 01:05 codelipenghui

I think zero queue size consumer has already been supported from a very early version. I've also fixed a related bug last year, see https://github.com/apache/pulsar/pull/10506.

image

It's better to show your Python client version and paste your code.

BewareMyPower avatar May 22 '22 10:05 BewareMyPower

is 0 equivalent to 1?

No.

BewareMyPower avatar May 22 '22 11:05 BewareMyPower

@BewareMyPower Do we have a test for the zero queue consumer of the Python client? If we have a test, please share the test in this issue.

codelipenghui avatar May 22 '22 11:05 codelipenghui

@BewareMyPower

It's better to show your Python client version and paste your code.

See https://github.com/apache/pulsar-client-python/issues/190; that contains Python/client/etc. version.

The code I'm using is:

from pulsar import Client
import os

TOPIC = 'THETOPIC'
SUBSCRIPTION = 'SUBSCRIPTION'

def main():
    client = Client(service_url='pulsar://localhost:6650')
    client.subscribe(
        topic=TOPIC,
        subscription_name=SUBSCRIPTION,
        receiver_queue_size=0,
        consumer_name=f'testconsumer-{os.getpid()}'
    )

if __name__ == '__main__':
    main()

zbentley avatar May 22 '22 14:05 zbentley

Could you clarify the difference between a setting of 0 and a setting of 1?

zbentley avatar May 22 '22 14:05 zbentley

@codelipenghui See https://github.com/apache/pulsar/blob/defeec0e84a63ea865f3a2790bc61b66a02254c5/pulsar-client-cpp/python/pulsar_test.py#L289-L306

This test was introduced in https://github.com/apache/pulsar/pull/5706 that was included since Pulsar 2.5.0. It fixes https://github.com/apache/pulsar/issues/5634. (Python consumer does not accept receiver_queue_size=0)

BewareMyPower avatar May 22 '22 14:05 BewareMyPower

@zbentley The common behavior between 0 and 1 receiver queue size is that the permits of FLOW request is 1.

However, if the receiver queue size is 0, only when receive is called will the consumer send the FLOW request to broker, which means the consumer won't prefetch the messages.

If the receiver queue size is 1, the consumer will send a FLOW request to broker immediately after the consumer is created successfully, which means the consumer will prefetch 1 message and cache it inside the consumer. It could affect the logic of the dispatcher in broker.

BewareMyPower avatar May 22 '22 14:05 BewareMyPower

@BewareMyPower thanks, can that be added to the client documentation for receiver-queue related flags?

zbentley avatar May 22 '22 14:05 zbentley

Update (and updated issue title): this only happens with partitioned topics. While I'm not quite sure what receiver queue size does on partitioned topics, I'm pretty sure it shouldn't start throwing errors for configurations that are valid for non-partitioned topics (unless receiver queue size is not honored at all for partitioned topics, in which case I'd expect it to always be an error to specify it).

zbentley avatar May 22 '22 15:05 zbentley

It looks like zero queue consumer cannot be used on a partitioned topic. I tested the Java code as well.

        var topic = "my-topic";
        var admin = PulsarAdmin.builder().serviceHttpUrl("http://localhost:8080").build();
        try {
            admin.topics().createPartitionedTopic(topic, 1);
        } catch (PulsarAdminException ignored) {
        }
        var client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
        var consumer = client.newConsumer()
                .topic(topic)
                .subscriptionName("sub-xxx")
                .receiverQueueSize(0)
                .subscribe();

The error message is more friendly and clear:

IllegalArgumentException: Receiver queue size needs to be greater than 0 for Topics Consumer

BewareMyPower avatar May 22 '22 15:05 BewareMyPower

I think it's a common problem for clients of all languages. This check was introduced in https://github.com/apache/pulsar/pull/1103.

    MultiTopicsConsumerImpl(/* ... */) {
        super(client, singleTopic, conf, Math.max(2, conf.getReceiverQueueSize()), executorProvider, subscribeFuture,
                schema, interceptors);

        checkArgument(conf.getReceiverQueueSize() > 0,
            "Receiver queue size needs to be greater than 0 for Topics Consumer");

IMO, it's a bug. It looks like the multi topics consumer requires the receiver queue size is at least 2. (from the Math.max(2, conf.getReceiverQueueSize()) call). I guess in the early time of Pulsar, the number of partitions must be at least 2, so we have this check.

But the zero queue consumer should be an exceptional case. We should fix both Java and C++ clients for this config. @codelipenghui

BewareMyPower avatar May 22 '22 15:05 BewareMyPower

@zbentley @BewareMyPower

It’s not enabled because we cannot know which partition the next message will be coming from. Any suggestion on how to achieve that?

from https://github.com/apache/pulsar/issues/7280#issuecomment-644238363 This is the main challenge to supporting zero queue consumers for a partitioned topic.

One option here is to create a function to merge messages from multiple topics/partitions to a non-partitioned topic, the zero consumer only consumes the messages from the merged topic.

codelipenghui avatar May 23 '22 01:05 codelipenghui

Got it. For zero queue consumer, there is no way to determine which internal consumer (on a specific partition) should be chosen to call receive.

BewareMyPower avatar May 23 '22 03:05 BewareMyPower

While I don't know much about this area of the code, that sounds fine to me.

Other solutions that may make sense:

  • Document that a zero queue size with a partitioned topic would potentially buffer up to N messages (where N is the number of partitions on the topic).
  • Sequentially loop over the internal receive calls for each partition, returning when a message is found on any. The interactions with the timeout and missed deliveries might be undesirable here though.

zbentley avatar May 23 '22 13:05 zbentley

The issue had no activity for 30 days, mark with Stale label.

github-actions[bot] avatar Jun 23 '22 02:06 github-actions[bot]