nats.py
nats.py copied to clipboard
SlowConsumerError with PushSubscription and sub.next_msg()
The following code generate SlowConsumerError even the pending queue is empty:
import asyncio
import nats
from nats.errors import TimeoutError
async def main():
errors = []
async def error_cb(err):
errors.append(err)
nc = await nats.connect(error_cb=error_cb)
js = nc.jetstream()
await js.add_stream(name="cqsub", subjects=["quux"])
sub = await js.subscribe("quux", durable="myapp", pending_bytes_limit=100)
for i in range(10):
await js.publish("quux", f'Hello World {i}'.encode())
while True:
try:
msg = await sub.next_msg()
print(msg.data)
await msg.ack()
except TimeoutError:
break
print(errors)
breakpoint()
await nc.close()
if __name__ == '__main__':
asyncio.run(main())
bin/test_js_limits.py
b'Hello World 0'
b'Hello World 1'
b'Hello World 2'
b'Hello World 3'
b'Hello World 4'
b'Hello World 5'
b'Hello World 6'
[SlowConsumerError(), SlowConsumerError(), SlowConsumerError()]
> bin/test_js_limits.py(36)main()
35
---> 36 await nc.close()
37
ipdb> sub
<nats.js.client.JetStreamContext.PushSubscription object at 0x7fe9d6d485b0>
ipdb> sub.pending_msgs
0
ipdb> sub.pending_bytes
-91
ipdb> sub._sub
<nats.aio.subscription.Subscription object at 0x7fe9d6d90a60>
ipdb> sub._sub.pending_bytes
91
ipdb>
It looks that next_msg is not subtracting from the correct pending_bytes.
https://github.com/nats-io/nats.py/blob/main/nats/js/client.py#L629
https://github.com/nats-io/nats.py/blob/main/nats/aio/subscription.py#L163
https://github.com/nats-io/nats.py/blob/main/nats/aio/client.py#L1722