nats.py icon indicating copy to clipboard operation
nats.py copied to clipboard

SlowConsumerError with PushSubscription and sub.next_msg()

Open KrumBoychev opened this issue 2 years ago • 0 comments

The following code generate SlowConsumerError even the pending queue is empty:

import asyncio
import nats
from nats.errors import TimeoutError


async def main():
    errors = []

    async def error_cb(err):
        errors.append(err)

    nc = await nats.connect(error_cb=error_cb)
    js = nc.jetstream()

    await js.add_stream(name="cqsub", subjects=["quux"])
    sub = await js.subscribe("quux", durable="myapp", pending_bytes_limit=100)
    for i in range(10):
        await js.publish("quux", f'Hello World {i}'.encode())

    while True:
        try:
            msg = await sub.next_msg()
            print(msg.data)
            await msg.ack()
        except TimeoutError:
            break

    print(errors)
    breakpoint()

    await nc.close()


if __name__ == '__main__':
    asyncio.run(main())
 bin/test_js_limits.py 
b'Hello World 0'
b'Hello World 1'
b'Hello World 2'
b'Hello World 3'
b'Hello World 4'
b'Hello World 5'
b'Hello World 6'
[SlowConsumerError(), SlowConsumerError(), SlowConsumerError()]
> bin/test_js_limits.py(36)main()
     35 
---> 36     await nc.close()
     37 

ipdb> sub
<nats.js.client.JetStreamContext.PushSubscription object at 0x7fe9d6d485b0>
ipdb> sub.pending_msgs
0
ipdb> sub.pending_bytes
-91
ipdb> sub._sub
<nats.aio.subscription.Subscription object at 0x7fe9d6d90a60>
ipdb> sub._sub.pending_bytes
91
ipdb> 

It looks that next_msg is not subtracting from the correct pending_bytes.

https://github.com/nats-io/nats.py/blob/main/nats/js/client.py#L629

https://github.com/nats-io/nats.py/blob/main/nats/aio/subscription.py#L163

https://github.com/nats-io/nats.py/blob/main/nats/aio/client.py#L1722

KrumBoychev avatar Oct 11 '22 15:10 KrumBoychev