Minimal setup for unit testing
I'm attempting to use kombu to communicate with a RabbitMQ server, and have done so successfully using the ConsumerMixin. However, I'm trying to setup simple unit tests using the kombu in-memory transport, and I cannot for the life of me get a simple working example of a message being sent and consumed on a connection using memory://. Maybe I just completely misunderstand the protocol, but I've been unable to find much about unit testing kombu.
Here's my simple example. I've tried a number of other things. I need to not use a SimpleQueue because the actual class I'm trying to test uses the Mixin which cannot read from a SimpleQueue.
conn = Connection("memory:///")
exchange = Exchange(settings.CONSUME_EXCHANGE, 'direct')
bound_exchange = exchange(conn)
queue = Queue(settings.CONSUME_QUEUE, exchange=exchange)
bound_queue = queue(conn)
producer = Producer(bound_queue.channel)
producer.publish({'hello': 'world'})
consumer = conn.Consumer(queues=[bound_queue])
test = False
def do_work(body, message):
test = True
consumer.register_callback(do_work)
try:
conn.drain_events(timeout=3)
except socket.timeout:
assert test
Ultimately I have a handler that performs some work on a received message, and then sends some messages to other queues. I'm trying to setup simple producers and consumers to mock upstream services sending messages, and also to verify that messages are leaving this particular application.
Any help would be greatly appreciated.
Edit:
I've managed to setup an example that at least allows me to publish and receive one message, but I'm unable to setup further consumers of the given topics. Using Pycharm's debugger, I can verify that the queues are publishing to and receiving from the same channel, the routing keys match, etc. etc...
@pytest.fixture(scope="function")
def connection():
return Connection('memory://localhost/', transport_options={'polling_interval': 0})
@pytest.fixture(scope="function")
def consume_exchange(connection):
return Exchange(settings.CONSUME_EXCHANGE, type='topic', channel=connection)
@pytest.fixture(scope="function")
def consume_queue(consume_exchange):
return Queue(exchange=consume_exchange)
@pytest.fixture(scope="function")
def producer_ingest(connection, consume_queue):
return Producer(connection, exchange=consume_queue.exchange)
# Used to override and provide should_stop so that it stops after first message
@pytest.fixture(scope="function")
def ingest_consumer(connection, consume_queue):
def test_work(body, message):
consumer.handle_message(body, message)
consumer.should_stop = True
consumer = ValidationHandler(connection=connection, queues=[consume_queue])
consumer.do_work = test_work
return consumer
class Worker(ConsumerMixin):
def __init__(self, connection, queues: list):
print('Init test worker')
self.connection = connection
self.queues = queues
def get_consumers(self, Consumer, channel):
return [Consumer(queues=self.queues,
callbacks=[self.on_task])]
def on_task(self, body, message):
print('Got task: {0!r}'.format(body))
message.ack()
self.should_stop = True
Then test:
def test(connection, ingest_consumer, producer_ingest, status_queue,
publish_valid_queue,
publish_invalid_queue):
# Works, ingest_consumer receives message, processes, and publishes to valid queue and status queue
producer_ingest.publish(json.loads(VALID_PAYLOAD), exchange=consume_queue.exchange, declare=[consume_queue])
ingest_consumer.run()
# Doesn't work, even though messages are published to the below queues
Worker(connection, queues=[status_queue, publish_valid_queue, publish_invalid_queue])
connection.drain_events(timeout=5)
At the moment, I've decided to use Mock to test function calls as one offs, and use fixtures to setup the necessary queues. It would still be nice to be able to test the actual production and consumption of messages to and from queues within unittests though.
@mock.patch('<module>.ValidationHandler.validate')
def test_validate(validate, ingest_consumer, published_valid_payload):
ingest_consumer.run() # defined to stop after first message
validate.assert_called_once()
And so on for each function.