aio-pika
aio-pika copied to clipboard
Best Practice with Pytest ?
I was wondering if there are some resources available (doc or library) to help get started with aio-pika
and pytest
?
I'm specifically looking for a clean solution to mock a RPC call, something similar to pytest-http which make it really easy to mock a response to a request ahead of time (example below)
To be precise, I would like to test a FastAPI endpoint that is making some RPC calls and I would like to mock the response to these calls as much as possible.
import httpx
from pytest_httpx import HTTPXMock
def test_json(httpx_mock: HTTPXMock):
httpx_mock.add_response(json=[{"key1": "value1", "key2": "value2"}])
with httpx.Client() as client:
assert client.get("https://test_url").json() == [{"key1": "value1", "key2": "value2"}]
thanks
our unit tests run through a locally running DB and rabbitMQ. Not sure if this is of any help to you, but also in Github we spin up a rabbitMQ Service run the tests and shut it down. In order to make this work there was no way around having to add a await asyncio.sleep(1)
there:
async def test_subscribe():
# that one subsripes to "test"
rabbitmq_connection = await subscribe_topic()
exchange = rabbitmq_connection._exchange
message_payload = [{"uuid": "00000000-1337-0000-0000-000000000001", "data": "test"}]
message = Message(body=json.dumps(message_payload).encode(), content_type="application/json")
await exchange.publish(message, routing_key="test")
await asyncio.sleep(1)
# We check if a Db entry has been updated so sth like:
assert get_from_db() == "DONE"
@dgarros you can see an example RabbitMQ-based services testing with Propan framework here
Underground it uses a quote simple code: building IncomingMessage object directly from code and call handler function with builded object like a regular function. You can find sources here. This way you can test aio-pika services too.