nats.py icon indicating copy to clipboard operation
nats.py copied to clipboard

JetStream client raises TimeoutError after reconnect

Open tothandras opened this issue 1 year ago • 1 comments

The JetStream client can't publish new messages after the NATS client reconnects. It seems to be disconnecting when there are no new messages for a while.

Got disconnected!
Got reconnected!
Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/nats/aio/client.py", line 1001, in _request_new_style
    msg = await asyncio.wait_for(future, timeout)
  File "/usr/lib/python3.8/asyncio/tasks.py", line 501, in wait_for
    raise exceptions.TimeoutError()
asyncio.exceptions.TimeoutError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "main.py", line 405, in nats_thread
    asyncio.run(nats_thread_async())
  File "/usr/lib/python3.8/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
  File "main.py", line 395, in nats_thread_async
    await js.publish(nats_stream, msg.encode())
  File "/usr/local/lib/python3.8/dist-packages/nats/js/client.py", line 120, in publish
    msg = await self._nc.request(
  File "/usr/local/lib/python3.8/dist-packages/nats/aio/client.py", line 965, in request
    msg = await self._request_new_style(
  File "/usr/local/lib/python3.8/dist-packages/nats/aio/client.py", line 1014, in _request_new_style
    raise errors.TimeoutError
nats.errors.TimeoutError: nats: timeout

The relevant code snippet:

message_queue = queue.Queue()

async def nats_thread_async():
    global callback_queue

    async def disconnected_cb():
        print("Got disconnected!")

    async def reconnected_cb():
        print("Got reconnected!")

    async def error_cb(e):
        print(f"There was an error: {e}")

    async def closed_cb():
        print("Connection is closed")

    nc = await nats.connect(
        config["nats_uri"],
        ping_interval=10,
        max_reconnect_attempts=24,
        disconnected_cb=disconnected_cb,
        reconnected_cb=reconnected_cb,
        error_cb=error_cb,
        closed_cb=closed_cb,
    )
    js = nc.jetstream()
    nats_stream = config["nats_stream"]
    
    while True:
        msg = message_queue.get(block=True, timeout=None)
        await js.publish(nats_stream, msg.encode())
        message_queue.task_done()


def nats_thread():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)

    asyncio.run(nats_thread_async())

tothandras avatar Sep 18 '22 20:09 tothandras

I had a similar issue and it's because we both use queue.Queue that blocks the thread. when you do msg = message_queue.get(block=True, timeout=None), you wait for a message to come by blocking the thread so any async background worker in nats doesn't run, so no ping for example. When a message finally arrives, the connection is long dead.

I fixed my issue using asyncio.Queue instead of queue.Queue. Please note that asyncio.Queue is not thread safe, which is sad, so in my case I had more threads and I had to also use a mutex using threading.Lock().

Without a mutex, if only one thread, it would look like this:


message_queue = asyncio.Queue()
...
def queue_msg(msg):
    message_queue.put(message)

...
        while True:
            msg = message_queue.get()
            await js.publish(nats_stream, msg.encode())
            message_queue.task_done()
...

My code looks more like this with the mutex:


message_queue = asyncio.Queue()
message_queue_mutex = threading.Lock()

...
def queue_msg(msg):
    message_queue_mutex.acquire()
    message_queue.put_nowait(msg)
    message_queue_mutex.release()

...
        while True:
            msg = None
            try:
                message_queue_mutex.acquire()
                msg = message_queue.get_nowait()
                message_queue_mutex.release()
            except asyncio.QueueEmpty:
                message_queue_mutex.release()
                # The sleep ads some latency, which was fine for me.
                # otherwise a more fancy solution could be done
                await asyncio.sleep(1)
                continue
            await js.publish(nats_stream, msg.encode())
            message_queue_mutex.acquire()
            message_queue.task_done()
            message_queue_mutex.release()
...

fungiboletus avatar Nov 24 '22 07:11 fungiboletus