nats.py
nats.py copied to clipboard
Client does not retry if TLS handshake fails
When the client receives an error during the TLS handshake, it looks like the retry logic doesn't kick in and the call to connect will never return. It's calling my error_cb properly, but then I never see it attempt to connect again, even though I have retries enabled in the settings.
2022-11-09 19:02:11,187 nats_util ERROR NATS Error:
Traceback (most recent call last):
File "/opt/rh/rh-python38/root/usr/local/lib/python3.8/site-packages/nats/aio/client.py", line 469, in connect
await self._process_connect_init()
File "/opt/rh/rh-python38/root/usr/local/lib/python3.8/site-packages/nats/aio/client.py", line 1892, in _process_connect_init
await self._transport.connect_tls(
File "/opt/rh/rh-python38/root/usr/local/lib/python3.8/site-packages/nats/aio/transport.py", line 155, in connect_tls
transport = await asyncio.wait_for(
File "/opt/rh/rh-python38/root/usr/lib64/python3.8/asyncio/tasks.py", line 494, in wait_for
return fut.result()
File "/opt/rh/rh-python38/root/usr/lib64/python3.8/asyncio/base_events.py", line 1200, in start_tls
await waiter
ConnectionResetError
That's weird, ConnectionResetError
is definitely a subsclass of OSError
which is catched by the _process_connect_init
method.
I wrote this small reproduction script:
import asyncio
from nats import NATS
class ReproNATS(NATS):
def _process_connect_init(self) -> None:
raise ConnectionResetError
async def error_callback(err: Exception) -> None:
print(f"Encountered an error: {repr(err)}")
async def main() -> None:
nc = ReproNATS()
await asyncio.wait_for(nc.connect(error_cb=error_callback), timeout=10)
if __name__ == "__main__":
asyncio.run(main())
And did not manage to reproduce the issue: when I run the script, client tries to reconnect.
> python repro.py
Encountered an error: ConnectionResetError()
Encountered an error: ConnectionResetError()
Encountered an error: ConnectionResetError()
Encountered an error: ConnectionResetError()
Encountered an error: ConnectionResetError()
It seems that your example is using a custom error callback, are you sure that it works as expected ? (default callback logs error using nats: encountered error: ...
)
My error callback is just logging the error:
async def on_error(e):
LOGGER.exception(f"NATS Error: {e}")
The error seems to specifically be happening during the TLS upgrade - is there any chance that there's some state that is not reset properly compared to when the error happens right away or during the initial connect?
Just a guess, but did you try setting self._transport
to None
in Client._close()
method ? (at the end).
I took a quick look and it seems that this attribute is never set back to None after being assigned the first time ?
Hello @charbonnierg, the self._transport
is not reset because Client manages the reconnection by itself, using the self._status
flag:
https://github.com/nats-io/nats.py/blob/0443f8a3ca322ea1c19c2e50053e58add9870fa7/nats/aio/client.py#L1348-L1358
Then when it gets closed it can reset itself to allow for reconnects:
https://github.com/nats-io/nats.py/blob/0443f8a3ca322ea1c19c2e50053e58add9870fa7/nats/aio/client.py#L625-L629
The error is handled here:
https://github.com/nats-io/nats.py/blob/0443f8a3ca322ea1c19c2e50053e58add9870fa7/nats/aio/client.py#L479-L490
The only way for the error to be then raised is that it ran out of reconnect attempts. Could it be the case @nosammai that the allow_reconnect
is set to False or None?
It seems to reconnect in other cases just fine - if i turn off the nats server or start up my app without the server running it will continuously reconnect with no issues.
Hello @charbonnierg, the
self._transport
is not reset because Client manages the reconnection by itself, using theself._status
flag:https://github.com/nats-io/nats.py/blob/0443f8a3ca322ea1c19c2e50053e58add9870fa7/nats/aio/client.py#L1348-L1358
Then when it gets closed it can reset itself to allow for reconnects:
https://github.com/nats-io/nats.py/blob/0443f8a3ca322ea1c19c2e50053e58add9870fa7/nats/aio/client.py#L625-L629
The error is handled here:
https://github.com/nats-io/nats.py/blob/0443f8a3ca322ea1c19c2e50053e58add9870fa7/nats/aio/client.py#L479-L490
The only way for the error to be then raised is that it ran out of reconnect attempts. Could it be the case @nosammai that the
allow_reconnect
is set to False or None?
I did not want to expand because I was not so sure, but I think I have a good idea of how the client is implemented.
What I was thinking is that self._transport
is assigned in only two lines in the codebase:
if not self._transport:
if s.uri.scheme in ("ws", "wss"):
self._transport = WebSocketTransport()
else:
# use TcpTransport as a fallback
self._transport = TcpTransport()
Moreover, self._transport
is never resetted to None (even though it's closed).
So the statement if not self._transport
will always be False after first connect.
I have almost 0 knowledge about asyncio and TLS, but I was thinking that maybe this could explain the behaviour observed by @nosammai (waiting forever ?)
The TLS transport always recreates a new asyncio.StreamWriter
each time connect
method is called, so it should not matter if the same transport instance is reused, as long as connect
is called first.
But if connect_tls
was called without calling connect
first, then the closed transport could be used which made me doubt this line which is the line raising error in traceback.
I think that connect_tls
is never called before connect
, but better be safe than sorry :)
BTW, can you not run your client with some HTTP server and expose an endpoint to list running asyncio tasks ? Something like this server.py
file:
import asyncio
import io
from nats import NATS
from fastapi import FastAPI
app = FastAPI()
nc = NATS()
@app.on_event("startup")
async def start_client() -> None:
asyncio.create_task(nc.connect())
@app.get("/tasks")
async def get_asyncio_tasks():
all_tasks = asyncio.all_tasks()
tasks_infos: List[Any] = []
for task in all_tasks:
memio = io.StringIO()
task.print_stack(file=memio)
memio.seek(0)
tasks_infos.append(
{
"name": task.get_name(),
"coro": task.get_coro().__qualname__,
"stack": memio.read().splitlines(False),
}
)
return {"count": len(all_tasks), "tasks": tasks_infos}
Run the app using uvicorn
:
uvicorn server:app
Then you can query which asyncio tasks are running:
curl -s http://localhost:8000/tasks | jq .tasks[].coro
In case client is connected the output should be:
# NATS
"Client._read_loop"
"Client._ping_interval"
"Client._flusher"
# Uvicorn
"LifespanOn.main"
"Server.serve"
"RequestResponseCycle.run_asgi"
Example of a detailed output:
{
"name": "Task-10",
"coro": "Client._flusher",
"stack": [
"Stack for <Task pending name='Task-10' coro=<Client._flusher() running at ~/repro-nats/.venv/lib/python3.8/site-packages/nats/aio/client.py:1981> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f60abcb9ac0>()]>> (most recent call last):",
" File \"~/repro-nats/.venv/lib/python3.8/site-packages/nats/aio/client.py\", line 1981, in _flusher",
" future: asyncio.Future = await self._flush_queue.get()"
]
}
I helped us debug problems several times. I'm sure there are metter methods though, but being able to list running tasks at any time is really valuable when debugging code with NATS client involved