pulsar-client-python
pulsar-client-python copied to clipboard
pattern_auto_discovery_period in pulsar.Client is not used.
This took me a little while to track down but im pretty sure its an actual bug. This seems simple enough to fix and I'd be willing to take on the PR to fix it. I've prototyped that change locally and its 1 line in the code and 1-2 lines in the unit test.
The Client
class has a subscribe
function. It has a parameter called pattern_auto_discovery_period
This value is unused and the value will always default to 60 seconds (because that is what is in the cpp files).
Here is a POC that shows the problem
import pulsar
import multiprocessing as M
import time
import re
import _pulsar
mode = "non-persistent" # ?? nothing received
TOPIC = f"{mode}://public/default/my-topic/whatever"
TOPIC2 = f"{mode}://public/default/my-topic/something"
TOPIC_WILD = f"{mode}://public/default/my-topic/.*"
def callback( consumer , msg ):
print(f"Got topic {msg.topic_name()} {msg.data()}\n", flush=True)
def main():
sub_mode = _pulsar.RegexSubscriptionMode.NonPersistentOnly
time.sleep( 2 )
client = pulsar.Client('pulsar://localhost:6650')
# This topic will be found right away becuase i created it first!
producer = client.create_producer(TOPIC)
client.subscribe( re.compile(TOPIC_WILD) , subscription_name="subname111",message_listener=callback , regex_subscription_mode=sub_mode , pattern_auto_discovery_period=100 )
# This topic will not be found until 60 seconds into the run regardless of pattern_auto_discovery_period
producer2 = client.create_producer( TOPIC2 )
for k in range(0,120):
time.sleep(1 )
o = f"some data {k}"
if k % 2 == 0:
producer.send( o.encode() )
else:
producer2.send( o.encode() )
client.close()
if __name__ == "__main__":
main( )
Here is a screenshot of the pattern discovery happening at 60 seconds.
There is a unit test that is supposed to be testing for this: test_topics_pattern_consumer
However, this test has a bug as well. Since all the producers are made before the subscribe call the topics are all found right away.
producer1 = client.create_producer(topic1)
producer2 = client.create_producer(topic2)
producer3 = client.create_producer(topic3) ##### MOVE THIS after the subscribe to demonstrate auto-recovery!
consumer = client.subscribe(
re.compile(topics_pattern),
"my-pattern-consumer-sub",
consumer_type=ConsumerType.Shared,
receiver_queue_size=10,
pattern_auto_discovery_period=1,
)
# wait enough time to trigger auto discovery
time.sleep(2)
Just ran into this issue myself, are there any plans on fixing this? It's been almost a year now since it was reported.