mockafka-py
mockafka-py copied to clipboard
Basic use case examples not working: `fixture 'message' not found`
Describe the bug
Following the basic use cases in the README does not work.
pytest returns fixture 'message' not found.
Tested with poetry and pip.
To Reproduce
poetry init
poetry add mockafka-py
poetry shell
### test.py ###
from mockafka import produce, consume
@produce(topic='test', key='test_key', value='test_value', partition=4)
@consume(topics=['test'])
def test_produce_and_consume_decorator(message):
"""
This test showcases the usage of both @produce and @consume decorators in a single test case.
It produces a message to the 'test' topic and then consumes it to perform further logic.
# Notice you may get message None
"""
# Your test logic for processing the consumed message here
if not message:
return
pass
$ pytest test.py
=================================================================================================================================================== test session starts ====================================================================================================================================================
platform linux -- Python 3.11.6, pytest-8.3.3, pluggy-1.5.0
rootdir:
configfile: pyproject.toml
plugins: asyncio-0.23.8, cov-5.0.0
asyncio: mode=Mode.STRICT
collected 1 item
test.py E [100%]
========================================================================================================================================================== ERRORS ==========================================================================================================================================================
___________________________________________________________________________________________________________________________________ ERROR at setup of test_produce_and_consume_decorator ___________________________________________________________________________________________________________________________________
file test.py, line 4
@produce(topic='test', key='test_key', value='test_value', partition=4)
@consume(topics=['test'])
def test_produce_and_consume_decorator(message):
E fixture 'message' not found
> available fixtures: _session_event_loop, cache, capfd, capfdbinary, caplog, capsys, capsysbinary, cov, doctest_namespace, event_loop, event_loop_policy, monkeypatch, no_cover, pytestconfig, record_property, record_testsuite_property, record_xml_attribute, recwarn, test.py::<event_loop>, tmp_path, tmp_path_factory, tmpdir, tmpdir_factory, unused_tcp_port, unused_tcp_port_factory, unused_udp_port, unused_udp_port_factory
> use 'pytest --fixtures [testpath]' for help on them.
Expected behavior Test passes successfully.
Desktop (please complete the following information):
- OS: Ubuntu 23.10
- python version 3.11.6
- mockafka-py: 0.1.61
Same here with:
- Ubuntu 20.04.6
- python v3.12.7
- pytest v8.3.3
- mockafka-py v0.1.61
You can work arround it by using the FakeConsumer and FakeProducer classes.
In my case, I needed to pass a Kafka Message to the function I'm testing, so I added this function in my tests file:
import pytest
import json
from confluent_kafka import Message
from mockafka import FakeConsumer, FakeProducer, setup_kafka
def get_fake_kafka_message(payload: dict) -> Message:
consumer = FakeConsumer()
producer = FakeProducer()
consumer.subscribe(['test.topic'])
producer.produce(
key='test_key',
value=json.dumps(payload),
topic='test.topic',
partition=0
)
message = consumer.poll()
return message
Then, in my test:
@setup_kafka(topics=[{"topic": "test.topic", "partition": 1}])
def test_run_creation_message():
my_payload = {...}
message = get_fake_kafka_message(my_payload)
# Rest of my test
It's a bit annoying to have to jump through so many hoops, but at least we can work arround it while waiting for a fix to this bug :/
is it possible to use a FakeProducer and a real Consumer in a test with this approach?
@asetup_kafka(topics=[{'topic': 'test_topic', 'partition': 16}], clean=True)
@aproduce(topic='test_topic', value='test_value', key='test_key', partition=0)
async def test_produce_with_decorator(self):
consumer = AIOKafkaConsumer('test_topic', bootstrap_servers='localhost:9092')
await consumer.start()
consumer.subscribe(['test_topic'])
message = await consumer.getone()
assert message.key == b'test_key'
assert message.value == b'test_value'
This doesn't seem to work as
> raise KafkaConnectionError(f"No connection to node with id {node_id}")
E aiokafka.errors.KafkaConnectionError: KafkaConnectionError: No connection to node with id 1
Could someone help me understand the value of the @consume decorator? I can see the value of @produce and @aproduce for test setup, but I feel I'm missing the value @consume provides. Naively it feels like it could be replaced by manual construction of a Message (or for async ConsumerRecord) instance rather than mocking a full produce/consume cycle just in the test setup.
Dear @PeterJCLaw,
You're correct; if they're looking to test their logic and code, they can do so independently. However, if their Kafka layer interacts with their logic—such as consuming a message, performing certain actions based on specific conditions, and then producing another message—this could be a relevant example.