aio-pika icon indicating copy to clipboard operation
aio-pika copied to clipboard

Best Practice with Pytest ?

Open dgarros opened this issue 2 years ago • 2 comments

I was wondering if there are some resources available (doc or library) to help get started with aio-pika and pytest ?

I'm specifically looking for a clean solution to mock a RPC call, something similar to pytest-http which make it really easy to mock a response to a request ahead of time (example below)

To be precise, I would like to test a FastAPI endpoint that is making some RPC calls and I would like to mock the response to these calls as much as possible.

import httpx
from pytest_httpx import HTTPXMock


def test_json(httpx_mock: HTTPXMock):
    httpx_mock.add_response(json=[{"key1": "value1", "key2": "value2"}])

    with httpx.Client() as client:
        assert client.get("https://test_url").json() == [{"key1": "value1", "key2": "value2"}]

thanks

dgarros avatar Jan 13 '23 09:01 dgarros

our unit tests run through a locally running DB and rabbitMQ. Not sure if this is of any help to you, but also in Github we spin up a rabbitMQ Service run the tests and shut it down. In order to make this work there was no way around having to add a await asyncio.sleep(1) there:

async def test_subscribe():
    # that one subsripes to "test"
    rabbitmq_connection = await subscribe_topic()
    exchange = rabbitmq_connection._exchange

    message_payload = [{"uuid": "00000000-1337-0000-0000-000000000001", "data": "test"}]
    message = Message(body=json.dumps(message_payload).encode(), content_type="application/json")
    await exchange.publish(message, routing_key="test")
    await asyncio.sleep(1)

    # We check if a Db entry has been updated so sth like:
    assert get_from_db() == "DONE"

maciossek avatar Jan 19 '23 15:01 maciossek

@dgarros you can see an example RabbitMQ-based services testing with Propan framework here

Underground it uses a quote simple code: building IncomingMessage object directly from code and call handler function with builded object like a regular function. You can find sources here. This way you can test aio-pika services too.

Lancetnik avatar Jun 29 '23 20:06 Lancetnik