nats.py icon indicating copy to clipboard operation
nats.py copied to clipboard

OutboundBufferLimitError exception on connection after `drain_timeout` passed

Open alllexx88 opened this issue 1 year ago • 5 comments

Hello! We run a service that has periods of activity and idleness. We try to reuse the nats connection created on init to publish to a subject, however after drain_timeout, we get this:

...
чер 19 15:01:31 ml-23 python[16985]: f_n: 165, n_obj: 139, d_time: 0.08978700637817383, fps: 11.137468998443948
чер 19 15:09:12 ml-23 python[16985]: nats: encountered error
чер 19 15:09:12 ml-23 python[16985]: Traceback (most recent call last):
чер 19 15:09:12 ml-23 python[16985]:   File "/home/ml/src/moving_objects_detection/venv11/lib/python3.11/site-packages/nats/aio/client.py", line 1404, in _attempt_reconnect
чер 19 15:09:12 ml-23 python[16985]:     await self._transport.wait_closed()
чер 19 15:09:12 ml-23 python[16985]:   File "/home/ml/src/moving_objects_detection/venv11/lib/python3.11/site-packages/nats/aio/transport.py", line 180, in wait_closed
чер 19 15:09:12 ml-23 python[16985]:     return await self._io_writer.wait_closed()
чер 19 15:09:12 ml-23 python[16985]:            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
чер 19 15:09:12 ml-23 python[16985]:   File "/usr/lib/python3.11/asyncio/streams.py", line 350, in wait_closed
чер 19 15:09:12 ml-23 python[16985]:     await self._protocol._get_close_waiter(self)
чер 19 15:09:12 ml-23 python[16985]:   File "/usr/lib/python3.11/asyncio/selector_events.py", line 1082, in _write_ready
чер 19 15:09:12 ml-23 python[16985]:     n = self._sock.send(self._buffer)
чер 19 15:09:12 ml-23 python[16985]:         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
чер 19 15:09:12 ml-23 python[16985]: BrokenPipeError: [Errno 32] Broken pipe
чер 19 15:09:12 ml-23 python[16985]: f_n: 0, n_obj: 0, d_time: 460.90226650238037, fps: 0.002169657371374275
чер 19 15:09:12 ml-23 python[16985]: Exception in thread Thread-1 (main):
чер 19 15:09:12 ml-23 python[16985]: Traceback (most recent call last):
чер 19 15:09:12 ml-23 python[16985]:   File "/usr/lib/python3.11/threading.py", line 1038, in _bootstrap_inner
чер 19 15:09:12 ml-23 python[16985]:     self.run()
чер 19 15:09:12 ml-23 python[16985]:   File "/usr/lib/python3.11/threading.py", line 975, in run
чер 19 15:09:12 ml-23 python[16985]:     self._target(*self._args, **self._kwargs)
чер 19 15:09:12 ml-23 python[16985]:   File "/home/ml/src/moving_objects_detection/main.py", line 141, in main
чер 19 15:09:12 ml-23 python[16985]:     buffer_handler.run(config)
чер 19 15:09:12 ml-23 python[16985]:   File "/home/ml/src/moving_objects_detection/buffer.py", line 131, in run
чер 19 15:09:12 ml-23 python[16985]:     self._processor(frame=frame, frame_number=frame_number, session_id=session_id)
чер 19 15:09:12 ml-23 python[16985]:   File "/home/ml/src/moving_objects_detection/processor.py", line 151, in __call__
чер 19 15:09:12 ml-23 python[16985]:     self._writer(
чер 19 15:09:12 ml-23 python[16985]:   File "/home/ml/src/moving_objects_detection/writer.py", line 92, in __call__
чер 19 15:09:12 ml-23 python[16985]:     self._loop.run_until_complete(self.__sender(
чер 19 15:09:12 ml-23 python[16985]:   File "/usr/lib/python3.11/asyncio/base_events.py", line 653, in run_until_complete
чер 19 15:09:12 ml-23 python[16985]:     return future.result()
чер 19 15:09:12 ml-23 python[16985]:            ^^^^^^^^^^^^^^^
чер 19 15:09:12 ml-23 python[16985]:   File "/home/ml/src/moving_objects_detection/writer.py", line 83, in __sender
чер 19 15:09:12 ml-23 python[16985]:     await self._nc.publish(self._subject, proto_bytes)
чер 19 15:09:12 ml-23 python[16985]:   File "/home/ml/src/moving_objects_detection/venv11/lib/python3.11/site-packages/nats/aio/client.py", line 820, in publish
чер 19 15:09:12 ml-23 python[16985]:     raise errors.OutboundBufferLimitError
чер 19 15:09:12 ml-23 python[16985]: nats.errors.OutboundBufferLimitError: nats: outbound buffer limit exceeded

The same happens when we just start the service, don't publish anything, wait for drain_timeout, and try to publish:

чер 19 14:50:23 ml-23 systemd[1]: Started Python movement detection service.
чер 19 14:50:24 ml-23 python[16703]: {'min_diff_threshold': 20, 'max_objects_count': 300, 'min_object_area': 225, 'bin_count': 3, 'bottom_q': 0.05, 'upper_q': 0.95, 'blur_size': None, 'blur_sqd': None, 'frames_diff_quantile': 0.99, 'noise_threshold': 0.4, 'discrete_flow_bin_count': 200, 'mask_color': (1, 0, 1)}
чер 19 14:56:05 ml-23 python[16703]: nats: encountered error
чер 19 14:56:05 ml-23 python[16703]: Traceback (most recent call last):
чер 19 14:56:05 ml-23 python[16703]:   File "/home/ml/src/moving_objects_detection/venv11/lib/python3.11/site-packages/nats/aio/client.py", line 1404, in _attempt_reconnect
чер 19 14:56:05 ml-23 python[16703]:     await self._transport.wait_closed()
чер 19 14:56:05 ml-23 python[16703]:   File "/home/ml/src/moving_objects_detection/venv11/lib/python3.11/site-packages/nats/aio/transport.py", line 180, in wait_closed
чер 19 14:56:05 ml-23 python[16703]:     return await self._io_writer.wait_closed()
чер 19 14:56:05 ml-23 python[16703]:            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
чер 19 14:56:05 ml-23 python[16703]:   File "/usr/lib/python3.11/asyncio/streams.py", line 350, in wait_closed
чер 19 14:56:05 ml-23 python[16703]:     await self._protocol._get_close_waiter(self)
чер 19 14:56:05 ml-23 python[16703]:   File "/usr/lib/python3.11/asyncio/selector_events.py", line 1082, in _write_ready
чер 19 14:56:05 ml-23 python[16703]:     n = self._sock.send(self._buffer)
чер 19 14:56:05 ml-23 python[16703]:         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
чер 19 14:56:05 ml-23 python[16703]: BrokenPipeError: [Errno 32] Broken pipe
чер 19 14:56:05 ml-23 python[16703]: f_n: 0, n_obj: 0, d_time: 0.6752424240112305, fps: 1.4809496033432998
чер 19 14:56:10 ml-23 python[16703]: Exception in thread Thread-1 (main):
чер 19 14:56:10 ml-23 python[16703]: Traceback (most recent call last):
чер 19 14:56:10 ml-23 python[16703]:   File "/usr/lib/python3.11/threading.py", line 1038, in _bootstrap_inner
чер 19 14:56:10 ml-23 python[16703]:     self.run()
чер 19 14:56:10 ml-23 python[16703]:   File "/usr/lib/python3.11/threading.py", line 975, in run
чер 19 14:56:10 ml-23 python[16703]:     self._target(*self._args, **self._kwargs)
чер 19 14:56:10 ml-23 python[16703]:   File "/home/ml/src/moving_objects_detection/main.py", line 141, in main
чер 19 14:56:10 ml-23 python[16703]:     buffer_handler.run(config)
чер 19 14:56:10 ml-23 python[16703]:   File "/home/ml/src/moving_objects_detection/buffer.py", line 131, in run
чер 19 14:56:10 ml-23 python[16703]:     self._processor(frame=frame, frame_number=frame_number, session_id=session_id)
чер 19 14:56:10 ml-23 python[16703]:   File "/home/ml/src/moving_objects_detection/processor.py", line 151, in __call__
чер 19 14:56:10 ml-23 python[16703]:     self._writer(
чер 19 14:56:10 ml-23 python[16703]:   File "/home/ml/src/moving_objects_detection/writer.py", line 92, in __call__
чер 19 14:56:10 ml-23 python[16703]:     self._loop.run_until_complete(self.__sender(
чер 19 14:56:10 ml-23 python[16703]:   File "/usr/lib/python3.11/asyncio/base_events.py", line 653, in run_until_complete
чер 19 14:56:10 ml-23 python[16703]:     return future.result()
чер 19 14:56:10 ml-23 python[16703]:            ^^^^^^^^^^^^^^^
чер 19 14:56:10 ml-23 python[16703]:   File "/home/ml/src/moving_objects_detection/writer.py", line 83, in __sender
чер 19 14:56:10 ml-23 python[16703]:     await self._nc.publish(self._subject, proto_bytes)
чер 19 14:56:10 ml-23 python[16703]:   File "/home/ml/src/moving_objects_detection/venv11/lib/python3.11/site-packages/nats/aio/client.py", line 820, in publish
чер 19 14:56:10 ml-23 python[16703]:     raise errors.OutboundBufferLimitError
чер 19 14:56:10 ml-23 python[16703]: nats.errors.OutboundBufferLimitError: nats: outbound buffer limit exceeded

As a workaround, we now set allow_reconnect=False on nats.connect() and reconnect manually on nats.errors.ConnectionClosedError exception, which works fine, but the behaviour above looks odd. Tried googling for outbound buffers on reconnect, and found this link on Buffering Messages During Reconnect Attempts, but it says that # Asyncio NATS client currently does not implement a reconnect buffer.

These are the relevant package versions:

nats-py==2.3.1
nats-server: v2.9.17

Thanks!

alllexx88 avatar Jun 19 '23 13:06 alllexx88

Hi @alllexx88. To confirm, you setup a connection and then call drain at some point in time in your application and then later you try to reuse the connection? Or are you saying that when you setup a connection and time passes (e.g. 30s) followed by a publish, you are getting this error? Or does this happen only after a reconnect?

bruth avatar Jun 21 '23 12:06 bruth

Hi @bruth Thank you for your reply! We set up a connection, and after a period of inactivity, the connection ends up being closed, without us doing this explicitly, and end up with the error log I posted after trying to publish. It happens after at least about a 250 seconds delay after the last publish through the connection (or after opening the connection, without publishing). This is not an exact and 100% accurate number, since rigorous tests would take a lot of time due to all the delays, but is a good approximation. I tried to write a simple error-reproducing code, but couldn't replicate the problem this way. We have a somewhat complicated design with multiprocessing, threading, and asyncio nats-py. At least I can say that waiting, publishing (and the error) happen in the same process and the same thread. Maybe it's related to how we handle the event loop (we don't use asyncio outside of working with nats-py). I will try to boil down the problem to a code I can share.

alllexx88 avatar Jun 23 '23 13:06 alllexx88

A reproducer code:

import asyncio
import nats
from alive_progress import alive_it
from time import sleep


nats_url = "localhost:4222"


class NATSPublisher:
    def __init__(self):
        self._nc = None
        self._loop = asyncio.new_event_loop()
        self._loop.run_until_complete(self.__build())

    async def __build(self):
        self._nc = await nats.connect(nats_url)

    async def __sender(self):
        # await self._nc.publish("teststream", b"\0"*(4 << 10))  # 4KB dummy message  # reconnects
        await self._nc.publish("teststream", b"\0"*(1 << 20))  # 1MB dummy message

    def __call__(self):
        self._loop.run_until_complete(self.__sender())


def main():
    writer = NATSPublisher()
    # wait for 250s with a progressbar
    for _ in alive_it(range(250)):
        sleep(1)
    index = 0
    while True:
        writer()
        print(f"Published index {index}")
        sleep(0.1)
        index += 1


async def async_main():
    nc = await nats.connect(nats_url)
    # wait for 250s with a progressbar
    for _ in alive_it(range(250)):
        await asyncio.sleep(1)
    index = 0
    while True:
        await nc.publish("teststream", b"\0"*(1 << 20))  # 1MB dummy message
        print(f"Published index {index}")
        await asyncio.sleep(0.1)
        index += 1


if __name__ == "__main__":
    # asyncio.run(async_main())  # works
    main()                       # crushes

Output:

$ python tst_nats_error.py
|████████████████████████████████████████| 250/250 [100%] in 4:10.2 (1.00/s) 
Published index 0
Published index 1
nats: encountered error
Traceback (most recent call last):
  File "********/venv/lib/python3.11/site-packages/nats/aio/client.py", line 1404, in _attempt_reconnect
    await self._transport.wait_closed()
  File "********/venv/lib/python3.11/site-packages/nats/aio/transport.py", line 180, in wait_closed
    return await self._io_writer.wait_closed()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/asyncio/streams.py", line 350, in wait_closed
    await self._protocol._get_close_waiter(self)
  File "/usr/lib/python3.11/asyncio/selector_events.py", line 1082, in _write_ready
    n = self._sock.send(self._buffer)
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
BrokenPipeError: [Errno 32] Broken pipe
Traceback (most recent call last):
  File "********/tst_nats_error.py", line 55, in <module>
    main()
  File "********/tst_nats_error.py", line 34, in main
    writer()
  File "********/tst_nats_error.py", line 24, in __call__
    self._loop.run_until_complete(self.__sender())
  File "/usr/lib/python3.11/asyncio/base_events.py", line 653, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "********/tst_nats_error.py", line 21, in __sender
    await self._nc.publish("teststream", b"\0"*(1 << 20))  # 1MB dummy message
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "********/venv/lib/python3.11/site-packages/nats/aio/client.py", line 820, in publish
    raise errors.OutboundBufferLimitError
nats.errors.OutboundBufferLimitError: nats: outbound buffer limit exceeded
Task was destroyed but it is pending!
task: <Task pending name='Task-9' coro=<Client._attempt_reconnect() running at ********/venv/lib/python3.11/site-packages/nats/aio/client.py:1426> wait_for=<Future pending cb=[Task.task_wakeup()]>>
Task was destroyed but it is pending!
task: <Task pending name='Task-12' coro=<open_connection() running at /usr/lib/python3.11/asyncio/streams.py:48> wait_for=<Future pending cb=[_chain_future.<locals>._call_check_cancel() at /usr/lib/python3.11/asyncio/futures.py:387, Task.task_wakeup()]> cb=[_release_waiter(<Future pendi...ask_wakeup()]>)() at /usr/lib/python3.11/asyncio/tasks.py:421]>

alllexx88 avatar Jun 23 '23 14:06 alllexx88

Thanks for sharing how to reproduce, to run blocking code within asyncio you could use await asyncio.get_running_loop().run_in_executor(None, sleep, 1) to avoid blocking the event loop.

wallyqs avatar Jun 23 '23 16:06 wallyqs

You're welcome, and thanks for the tip, @wallyqs !

alllexx88 avatar Jun 23 '23 16:06 alllexx88