pulsar-client-python
pulsar-client-python copied to clipboard
[Bug] receiver_queue_size is not honoured with partitioned topic #21593
Versions Pulsar version 2.10.4 OS: Ubuntu(version=22.04) Client(Python): pulsar-client(Version=3.3.0)
Minimal reproduce step Create a partitioned topic with one partition bin/pulsar-admin topics create-partitioned-topic persistent://public/default/test-consumer -p 1
Publish some messages
Create one consumer as following consumer = client.subscribe( topic='persistent://public/default/test-consumer', subscription_name='test', consumer_type=ConsumerType.Shared, initial_position=InitialPosition.Earliest, receiver_queue_size=1, max_total_receiver_queue_size_across_partitions=1, consumer_name=None, negative_ack_redelivery_delay_ms=60000, unacked_messages_timeout_ms=3600000)
Receive and ack one message consumer.receive(timeout_millis=30000) consumer.acknowledge(message=message_id)
Check stats(Its showing 2 unack messages but its not received yet, also if we create another consumer and try to consume messages its not delivering)
What did you expect to see? It should not show 2 unack messages, instead it should show 1 message since receiver size is 1.
What did you see instead? I can see 2 unack in stats as following Command: bin/pulsar-admin topics partitioned-stats persistent://public/default/test-consumer
{ "msgRateIn" : 0.0, "msgThroughputIn" : 0.0, "msgRateOut" : 0.07158759338136238, "msgThroughputOut" : 4.271393071754622, "bytesInCounter" : 179, "msgInCounter" : 3, "bytesOutCounter" : 179, "msgOutCounter" : 3, "averageMsgSize" : 0.0, "msgChunkPublished" : false, "storageSize" : 179, "backlogSize" : 120, "publishRateLimitedTimes" : 0, "earliestMsgPublishTimeInBacklogs" : 0, "offloadedStorageSize" : 0, "lastOffloadLedgerId" : 0, "lastOffloadSuccessTimeStamp" : 0, "lastOffloadFailureTimeStamp" : 0, "publishers" : [ ], "waitingPublishers" : 0, "subscriptions" : { "test" : { "msgRateOut" : 0.07158759338136238, "msgThroughputOut" : 4.271393071754622, "bytesOutCounter" : 179, "msgOutCounter" : 3, "msgRateRedeliver" : 0.0, "messageAckRate" : 0.0, "chunkedMessageRate" : 0, "msgBacklog" : 2, "backlogSize" : 0, "earliestMsgPublishTimeInBacklog" : 0, "msgBacklogNoDelayed" : 2, "blockedSubscriptionOnUnackedMsgs" : false, "msgDelayed" : 0, "unackedMessages" : 2, "msgRateExpired" : 0.0, "totalMsgExpired" : 0, "lastExpireTimestamp" : 0, "lastConsumedFlowTimestamp" : 0, "lastConsumedTimestamp" : 0, "lastAckedTimestamp" : 0, "lastMarkDeleteAdvancedTimestamp" : 0, "consumers" : [ { "msgRateOut" : 0.07158759338136238, "msgThroughputOut" : 4.271393071754622, "bytesOutCounter" : 179, "msgOutCounter" : 3, "msgRateRedeliver" : 0.0, "messageAckRate" : 0.023862530971099603, "chunkedMessageRate" : 0.0, "availablePermits" : 0, "unackedMessages" : 2, "avgMessagesPerEntry" : 0, "blockedConsumerOnUnackedMsgs" : false, "lastAckedTimestamp" : 0, "lastConsumedTimestamp" : 0 }, { "msgRateOut" : 0.0, "msgThroughputOut" : 0.0, "bytesOutCounter" : 0, "msgOutCounter" : 0, "msgRateRedeliver" : 0.0, "messageAckRate" : 0.0, "chunkedMessageRate" : 0.0, "availablePermits" : 1, "unackedMessages" : 0, "avgMessagesPerEntry" : 0, "blockedConsumerOnUnackedMsgs" : false, "lastAckedTimestamp" : 0, "lastConsumedTimestamp" : 0 } ], "isDurable" : true, "isReplicated" : false, "allowOutOfOrderDelivery" : false, "consumersAfterMarkDeletePosition" : { }, "nonContiguousDeletedMessagesRanges" : 0, "nonContiguousDeletedMessagesRangesSerializedSize" : 0, "subscriptionProperties" : { }, "durable" : true, "replicated" : false } }, "replication" : { }, "nonContiguousDeletedMessagesRanges" : 0, "nonContiguousDeletedMessagesRangesSerializedSize" : 0, "compaction" : { "lastCompactionRemovedEventCount" : 0, "lastCompactionSucceedTimestamp" : 0, "lastCompactionFailedTimestamp" : 0, "lastCompactionDurationTimeInMills" : 0 }, "metadata" : { "partitions" : 1 }, "partitions" : { } }