nats.py icon indicating copy to clipboard operation
nats.py copied to clipboard

nats.connect : There was an error:

Open Leem0sh opened this issue 3 years ago • 6 comments

Hello. Having a hard time implementing NATS with Kafka into FastAPI.

Vision: Request from user comes to the API -> API creates an event and creates NATS subscribe on Correlation ID.. That event goes into a service, that does some computing -> this service again creates a kafka event with result -> the results go into - let's call it Finisher - and what Finisher does - it reads the Kafka event and transform it into NATS publish on the same CID and sends it back into API where API is already listening.

Basically: |API| --kafka event-> |Service| --kafka event-> |Finisher| --NATS pub-> |API|

Docker-compose:

version: "2"

services:
  nats:
    image: nats:latest
    ports:
      - '4222:4222'
      - '6222:6222'
      - '8222:8222'

I created a working code for FastAPI down below and then 2 super basic services (Service and Finisher) which only reads the content of an event and send its somewhere else - Service by kafka event and Finisher publishes into NATS.

async def nats_connect():
    async def disconnected_cb():
        print('Got disconnected!')

    async def reconnected_cb():
        print(f'Got reconnected to {nc.connected_url.netloc}')

    async def error_cb(
        e
    ):
        print(f'There was an error: {e}')

    async def closed_cb():
        print('Connection is closed')

    nc = await nats.connect(
        error_cb=error_cb,
        reconnected_cb=reconnected_cb,
        disconnected_cb=disconnected_cb,
        closed_cb=closed_cb,
    )
    return nc



@router.post(
    "/measure/{project_id}/{model}",
    response_model=Dimension,
    dependencies=[maybe_security()],
    description="Descr",
    summary="Summ",
    tags=["Dimensions"],
    include_in_schema=True,
)
async def _(
    *,
    client_session: ClientSession = Depends(client_session_dep),
    producer: KafkaProducer = Depends(kafka_producer),
    nats_consumer: Client = Depends(nats_connect),
    project_id: NonNegativeInt,
    description: DimensionRequestBodyModel,
) -> Dimension | JSONResponse:

    response = None
    CID = uuid4().hex
    parsed_description = parse_obj_as(DimensionRequestBodyModel, description)

    print(CID, parsed_description)
    await send_kafka_event(settings.TOPIC_FROM_GATEWAY_TO_DS, producer, parsed_description.dict(), CID)
    sub = await nats_consumer.subscribe(CID)
    try:
        async for msg in sub.messages:
            print(f"Received a message on '{msg.subject} {msg.reply}': {msg.data.decode()}")
            response = json.loads(msg.data.decode())
            await sub.unsubscribe(CID)

    except Exception as e:
        print(e)
    return JSONResponse(status_code=200, content=response)

All this was working properly with single request so I wanted to test it. I created a sync forloop with 1000 requests. Everything was working properly and I was getting results in the response. Until it... broke? Suddenly I started getting

There was an error:
There was an error:
There was an error:
...

So I removed the printing so I can see what is happening and I got this error repeating over and over again:


[07.12.2021 13:56:51] ERROR [nats.aio.client:client - _default_error_callback:105] nats: encountered error
Traceback (most recent call last):
  File "c:\users\home\anaconda3\envs\fastapi\lib\asyncio\streams.py", line 540, in readline
    line = await self.readuntil(sep)
  File "c:\users\home\anaconda3\envs\fastapi\lib\asyncio\streams.py", line 632, in readuntil
    await self._wait_for_data('readuntil')
  File "c:\users\home\anaconda3\envs\fastapi\lib\asyncio\streams.py", line 517, in _wait_for_data
    await self._waiter
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "c:\users\home\anaconda3\envs\fastapi\lib\asyncio\tasks.py", line 492, in wait_for
    fut.result()
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "c:\users\home\anaconda3\envs\fastapi\lib\site-packages\nats\aio\client.py", line 369, in connect
    await self._process_connect_init()
  File "c:\users\home\anaconda3\envs\fastapi\lib\site-packages\nats\aio\client.py", line 1579, in _process_connect_init
    info_line = await asyncio.wait_for(
  File "c:\users\home\anaconda3\envs\fastapi\lib\asyncio\tasks.py", line 494, in wait_for
    raise exceptions.TimeoutError() from exc
asyncio.exceptions.TimeoutError
[07.12.2021 13:56:55] ERROR [nats.aio.client:client - _default_error_callback:105] nats: encountered error
Traceback (most recent call last):
  File "c:\users\home\anaconda3\envs\fastapi\lib\asyncio\streams.py", line 540, in readline
    line = await self.readuntil(sep)
  File "c:\users\home\anaconda3\envs\fastapi\lib\asyncio\streams.py", line 632, in readuntil
    await self._wait_for_data('readuntil')
  File "c:\users\home\anaconda3\envs\fastapi\lib\asyncio\streams.py", line 517, in _wait_for_data
    await self._waiter
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "c:\users\home\anaconda3\envs\fastapi\lib\asyncio\tasks.py", line 492, in wait_for
    fut.result()
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "c:\users\home\anaconda3\envs\fastapi\lib\site-packages\nats\aio\client.py", line 369, in connect
    await self._process_connect_init()
  File "c:\users\home\anaconda3\envs\fastapi\lib\site-packages\nats\aio\client.py", line 1579, in _process_connect_init
    info_line = await asyncio.wait_for(
  File "c:\users\home\anaconda3\envs\fastapi\lib\asyncio\tasks.py", line 494, in wait_for
    raise exceptions.TimeoutError() from exc
asyncio.exceptions.TimeoutError
[07.12.2021 13:56:59] ERROR [nats.aio.client:client - _default_error_callback:105] nats: encountered error
Traceback (most recent call last):
  File "c:\users\home\anaconda3\envs\fastapi\lib\asyncio\streams.py", line 540, in readline
    line = await self.readuntil(sep)
  File "c:\users\home\anaconda3\envs\fastapi\lib\asyncio\streams.py", line 632, in readuntil
    await self._wait_for_data('readuntil')
  File "c:\users\home\anaconda3\envs\fastapi\lib\asyncio\streams.py", line 517, in _wait_for_data
    await self._waiter
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "c:\users\home\anaconda3\envs\fastapi\lib\asyncio\tasks.py", line 492, in wait_for
    fut.result()
asyncio.exceptions.CancelledError

Any idea what suddenly became an issue? Since then I'm not able to run anything again even with the reboot of docker-compose. I assumed the issue is inside the nats.connect bcs the endpoint code is not printing anything (it should right after calling it).

Leem0sh avatar Dec 07 '21 14:12 Leem0sh

Hi I'm noticing that there is no url and the error is reporting that happens on init? Could you confirm that the missing URL is not the issue? Like:

nc = await nats.connect(
        "nats://nats:4222",
        error_cb=error_cb,
        reconnected_cb=reconnected_cb,
        disconnected_cb=disconnected_cb,
        closed_cb=closed_cb,
    )

wallyqs avatar Dec 07 '21 17:12 wallyqs

Hi I'm noticing that there is no url and the error is reporting that happens on init? Could you confirm that the missing URL is not the issue? Like:

nc = await nats.connect(
        "nats://nats:4222",
        error_cb=error_cb,
        reconnected_cb=reconnected_cb,
        disconnected_cb=disconnected_cb,
        closed_cb=closed_cb,
    )

Hello, I don't really think this is the issue as it was working properly for some time even without nats://nats:4222 but I'll try that in the morning and let you know. Thanks!

Leem0sh avatar Dec 07 '21 22:12 Leem0sh

@wallyqs even when it's empty, it connects to NATS. Code started to work again but only sent 500 (exactly - which is odd) requests and everything froze again lol.

Leem0sh avatar Dec 08 '21 13:12 Leem0sh

I created a fast code where you can recreate the issue.

1. git clone https://github.com/Leem0sh/FKN
2. cd FKN
3. pip install -r requirements.txt
4. docker-compose -f docker-compose.yml up # Kafka and NATS created - you can see Kafka cluster on 127.0.0.0:8080, where the UI is
5. uvicorn api:app --reload # starts api
6. python consumer.py # starts the processnig service
7. python finisher.py # starts the finisher

Now all the services should be running.

8. Now go to 127.0.0.0:8000/docs
9. Send the example request I created so you can see the code is working
10. python test_run.py

test run is just few hundreds of requests. But around 500th request, NATS freezes. Funny thing is that even after reloading the docker-compose the issue is still there.

PS. sorry for the messy code, I was trying to get it running asap.

  • I found https://github.com/nats-io/nats.rs/issues/210, which might be similar but I'm not sure if I understood that correctly.

Leem0sh avatar Dec 09 '21 01:12 Leem0sh

thanks @Leem0sh for putting this together, will take a look today.

wallyqs avatar Dec 09 '21 18:12 wallyqs

Hi - I encountered this same exact issue today with py-nats==2.3.1. Is there any update to this?

matutter avatar Aug 08 '23 15:08 matutter