nats.py
nats.py copied to clipboard
WORK_QUEUE policy not working
Hi I am using nats-py==2.1.7
and when I am trying to make a load balancing with the retention policy RetentionPolicy.WORK_QUEUE
the msgs are still delivered to both workers.
I don't understand what I am doing wrong, but I can't make it work so the msgs are only delivered to one subscription. The messages are deleted from the stream, but still both workers get them. I tried with a callback set and without, but it didn't change anything.
The code is to test it is here below.
import asyncio
import nats
from nats.errors import TimeoutError
from nats.aio.client import Client as NatsClient # type: ignore
from nats.aio.client import Msg as NatsMsg
from nats.js.api import RetentionPolicy
from nats.js.api import ConsumerConfig
from nats.js.api import StreamInfo
from nats.js.api import StreamConfig
from nats.aio.subscription import Subscription
stream_name = "job-manager"
service_name = "app-condition"
full_sub = "%s.%s.demo.proces" % (stream_name, service_name)
async def msgcb(sub: Subscription, name: str):
while True:
try:
msg = await sub.next_msg()
await msg.ack()
print("%s:" % name, msg.data.decode())
await asyncio.sleep(2)
except TimeoutError:
await asyncio.sleep(2)
print("no msgs")
async def qsub_a(msg: NatsMsg):
print("QSUB A:", msg.data.decode())
await msg.ack()
await asyncio.sleep(2)
async def qsub_b(msg: NatsMsg):
print("QSUB B:", msg.data.decode())
await msg.ack()
await asyncio.sleep(2)
async def get_or_create_nats_queue_stream(strm_name: str, nc: NatsClient) -> StreamInfo:
# Create JetStream context
js = nc.jetstream()
strm_config = StreamConfig()
strm_config.retention = RetentionPolicy.WORK_QUEUE
strm_config.name = strm_name
strm_config.subjects = ["%s" % strm_name, "%s.>" % strm_name]
strm_config.max_age = 3600000000
try:
strm_info = await js.add_stream(config=strm_config)
print('Nats stream %s created with subjects %s' % (strm_name, strm_info.config.subjects))
return strm_info
except Exception as e:
if e is not None and e.err_code == 10058:
strm_info = await js.stream_info(strm_name)
print('Nats stream %s retrieved with subjects %s' % (strm_name, strm_info.config.subjects))
return strm_info
else:
print(e)
async def get_or_create_nats_stream_sub(stream_name: str, service_name: str, nc: NatsClient, cb = None) -> Subscription:
subject = "%s.%s.>" % (stream_name, service_name)
js = nc.jetstream()
cs_config = ConsumerConfig()
cs_config.deliver_group = service_name
print('Nats stream queue subscription created on subject %s and queue name %s' % (subject, service_name))
if cb is not None:
sub = await js.subscribe(subject, service_name, cb=cb, config=cs_config, manual_ack=True)
else:
sub = await js.subscribe(subject, service_name, config=cs_config, manual_ack=True)
return sub
async def keyboardinterrupt():
while True:
await asyncio.sleep(1)
async def main(publish: bool = False):
nc = await nats.connect("localhost")
js = nc.jetstream()
strm_info = await get_or_create_nats_queue_stream(stream_name, nc)
print(strm_info)
if publish:
for i in range(0, 100):
ack = await js.publish(
full_sub, f"hello world: {i}".encode(), stream=stream_name
)
print(full_sub, ack)
sub1 = await get_or_create_nats_stream_sub(stream_name, service_name, nc, cb=qsub_a)
sub2 = await get_or_create_nats_stream_sub(stream_name, service_name, nc, cb=qsub_b)
# sub1 = await get_or_create_nats_stream_sub(stream_name, service_name, nc)
# sub2 = await get_or_create_nats_stream_sub(stream_name, service_name, nc)
# await msgcb(sub1, 'sub1')
# await msgcb(sub2, 'sub2')
try:
loop = asyncio.get_event_loop()
task = loop.create_task(keyboardinterrupt())
await task
except KeyboardInterrupt:
loop.stop()
if __name__ == "__main__":
asyncio.run(main(True))
I'm seeing the same issue with version 2.2.0
. I'm running using the NATS Docker image 2.9.10
for my server. I ran @rduque1's code and also my own adaptation of the demo code. I tried tweaking the consumer settings to delay redelivery beyond what my random
sleeps, but I still see several redeliveries.
import asyncio
from datetime import datetime
import nats
from nats.js.api import ConsumerConfig, RetentionPolicy
from random import random
async def main():
async def closed_cb():
print("Connection closed")
nc = await nats.connect("localhost", closed_cb=closed_cb)
# Create JetStream context.
js = nc.jetstream()
await js.add_stream(name="work-stream", subjects=["logs"], retention=RetentionPolicy.WORK_QUEUE)
for i in range(0, 100):
ack = await js.publish("logs", f"hello world: {i}".encode())
print(f"{datetime.now()} {ack}")
# Create deliver group that will be have load balanced messages.
async def qsub_a(msg):
print(f"QSUB A {msg.metadata.timestamp}: {msg.data}")
await msg.in_progress()
await asyncio.sleep(.5 + random())
# await asyncio.sleep(random())
await msg.ack_sync()
if msg.metadata.num_delivered > 1:
print(f"Delivered {msg.metadata.num_delivered} times")
async def qsub_b(msg):
print(f"QSUB B {msg.metadata.timestamp}: {msg.data}")
await msg.in_progress()
await asyncio.sleep(2)
# await asyncio.sleep(random())
await msg.ack_sync()
if msg.metadata.num_delivered > 1:
print(f"Delivered {msg.metadata.num_delivered} times")
config_a = ConsumerConfig(ack_wait=120, flow_control=True, max_ack_pending=1)
config_b = ConsumerConfig(ack_wait=120, flow_control=True, max_ack_pending=1)
await js.subscribe("logs", "workers", cb=qsub_b, manual_ack=True, config=config_b)
await js.subscribe("logs", "workers", cb=qsub_a, manual_ack=True, config=config_a)
try:
while(True):
await asyncio.sleep(2)
except KeyboardInterrupt:
pass
finally:
await nc.close()
if __name__ == '__main__':
asyncio.run(main())
I actually tried this again, yesterday, and it worked as expected. I didn't realize that the server's deliver group's configuration wasn't updated as I changed settings. Since I started a fresh server, it got the settings above and the ack_wait
setting was used.