mockafka-py
mockafka-py copied to clipboard
FakeAIOKafkaConsumer has issues retrieving headers and bytes value/key
I'd like to start by thanking you, for resolving my last issue so fast.
After checking out the lastest version, I have however run into two problems. Given following simple pytest example (which has imputs according to AIOKafka Documentation here: https://aiokafka.readthedocs.io/en/stable/api.html#producer-class):
@pytest.mark.asyncio
@asetup_kafka(topics=[{"topic": "test_topic1", "partition": 2}], clean=True)
async def test_headers():
producer = FakeAIOKafkaProducer()
consumer = FakeAIOKafkaConsumer()
await producer.start()
await consumer.start()
consumer.subscribe({"test_topic1"})
await producer.send(topic="test_topic1", headers=[('header_name', b"test"), ('header_name2', b"test")], key=b"test")
await producer.stop()
record = await consumer.getone()
await consumer.stop()
You run into encoding issue:
test_kafka.py:95:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../.venv/lib/python3.11/site-packages/mockafka/aiokafka/aiokafka_consumer.py:223: in getone
for _, record in self._fetch(partitions):
../../.venv/lib/python3.11/site-packages/mockafka/aiokafka/aiokafka_consumer.py:210: in _fetch
record = self._fetch_one(tp.topic, tp.partition)
../../.venv/lib/python3.11/site-packages/mockafka/aiokafka/aiokafka_consumer.py:190: in _fetch_one
return message_to_record(message, offset=consumer_amount)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
message = <mockafka.message.Message object at 0x7fbd38dc7ad0>, offset = 0
def message_to_record(message: Message, offset: int) -> ConsumerRecord[bytes, bytes]:
topic: Optional[str] = message.topic()
partition: Optional[int] = message.partition()
timestamp: Optional[int] = message.timestamp()
if topic is None or partition is None or timestamp is None:
fields = [
("topic", topic),
("partition", partition),
("timestamp", timestamp),
]
missing = ", ".join(x for x, y in fields if y is None)
raise ValueError(f"Message is missing key components: {missing}")
key_str: Optional[str] = message.key()
value_str: Optional[str] = message.value()
> key = key_str.encode() if key_str is not None else None
E AttributeError: 'bytes' object has no attribute 'encode'
../../.venv/lib/python3.11/site-packages/mockafka/aiokafka/aiokafka_consumer.py:39: AttributeError
(This issue also applies to value.)
Secondly, if we change the type of the key into string (which is not supported by the original method):
await producer.send(topic="test_topic1", headers=[('header_name', b'test'), ('header_name2', b'test')], key="test")
We run into another issue with headers:
test_kafka.py:80:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../.venv/lib/python3.11/site-packages/mockafka/aiokafka/aiokafka_consumer.py:223: in getone
for _, record in self._fetch(partitions):
../../.venv/lib/python3.11/site-packages/mockafka/aiokafka/aiokafka_consumer.py:210: in _fetch
record = self._fetch_one(tp.topic, tp.partition)
../../.venv/lib/python3.11/site-packages/mockafka/aiokafka/aiokafka_consumer.py:190: in _fetch_one
return message_to_record(message, offset=consumer_amount)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
message = <mockafka.message.Message object at 0x7eff37066850>, offset = 0
def message_to_record(message: Message, offset: int) -> ConsumerRecord[bytes, bytes]:
topic: Optional[str] = message.topic()
partition: Optional[int] = message.partition()
timestamp: Optional[int] = message.timestamp()
if topic is None or partition is None or timestamp is None:
fields = [
("topic", topic),
("partition", partition),
("timestamp", timestamp),
]
missing = ", ".join(x for x, y in fields if y is None)
raise ValueError(f"Message is missing key components: {missing}")
key_str: Optional[str] = message.key()
value_str: Optional[str] = message.value()
key = key_str.encode() if key_str is not None else None
value = value_str.encode() if value_str is not None else None
return ConsumerRecord(
topic=topic,
partition=partition,
offset=offset,
timestamp=timestamp,
# https://github.com/apache/kafka/blob/932759bd70ce646ced5298a2ad8db02c0cea3643/clients/src/main/java/org/apache/kafka/common/record/TimestampType.java#L25
timestamp_type=0, # CreateTime
key=key,
value=value,
checksum=None, # Deprecated, we won't support it
serialized_key_size=len(key) if key else 0,
serialized_value_size=len(value) if value else 0,
> headers=tuple((message.headers() or {}).items()),
)
E AttributeError: 'list' object has no attribute 'items'
../../.venv/lib/python3.11/site-packages/mockafka/aiokafka/aiokafka_consumer.py:54: AttributeError