mockafka-py icon indicating copy to clipboard operation
mockafka-py copied to clipboard

FakeAIOKafkaConsumer has issues retrieving headers and bytes value/key

Open bezruc opened this issue 1 year ago • 1 comments

I'd like to start by thanking you, for resolving my last issue so fast.

After checking out the lastest version, I have however run into two problems. Given following simple pytest example (which has imputs according to AIOKafka Documentation here: https://aiokafka.readthedocs.io/en/stable/api.html#producer-class):

@pytest.mark.asyncio
@asetup_kafka(topics=[{"topic": "test_topic1", "partition": 2}], clean=True)
async def test_headers():
    producer = FakeAIOKafkaProducer()
    consumer = FakeAIOKafkaConsumer()
    await producer.start()
    await consumer.start()
    consumer.subscribe({"test_topic1"})
    await producer.send(topic="test_topic1", headers=[('header_name', b"test"), ('header_name2', b"test")], key=b"test")
    await producer.stop()
    record = await consumer.getone()
    await consumer.stop()

You run into encoding issue:

test_kafka.py:95: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../../.venv/lib/python3.11/site-packages/mockafka/aiokafka/aiokafka_consumer.py:223: in getone
    for _, record in self._fetch(partitions):
../../.venv/lib/python3.11/site-packages/mockafka/aiokafka/aiokafka_consumer.py:210: in _fetch
    record = self._fetch_one(tp.topic, tp.partition)
../../.venv/lib/python3.11/site-packages/mockafka/aiokafka/aiokafka_consumer.py:190: in _fetch_one
    return message_to_record(message, offset=consumer_amount)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

message = <mockafka.message.Message object at 0x7fbd38dc7ad0>, offset = 0

    def message_to_record(message: Message, offset: int) -> ConsumerRecord[bytes, bytes]:
        topic: Optional[str] = message.topic()
        partition: Optional[int] = message.partition()
        timestamp: Optional[int] = message.timestamp()
    
        if topic is None or partition is None or timestamp is None:
            fields = [
                ("topic", topic),
                ("partition", partition),
                ("timestamp", timestamp),
            ]
            missing = ", ".join(x for x, y in fields if y is None)
            raise ValueError(f"Message is missing key components: {missing}")
    
        key_str: Optional[str] = message.key()
        value_str: Optional[str] = message.value()
    
>       key = key_str.encode() if key_str is not None else None
E       AttributeError: 'bytes' object has no attribute 'encode'

../../.venv/lib/python3.11/site-packages/mockafka/aiokafka/aiokafka_consumer.py:39: AttributeError

(This issue also applies to value.)

Secondly, if we change the type of the key into string (which is not supported by the original method): await producer.send(topic="test_topic1", headers=[('header_name', b'test'), ('header_name2', b'test')], key="test")

We run into another issue with headers:

test_kafka.py:80: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../../.venv/lib/python3.11/site-packages/mockafka/aiokafka/aiokafka_consumer.py:223: in getone
    for _, record in self._fetch(partitions):
../../.venv/lib/python3.11/site-packages/mockafka/aiokafka/aiokafka_consumer.py:210: in _fetch
    record = self._fetch_one(tp.topic, tp.partition)
../../.venv/lib/python3.11/site-packages/mockafka/aiokafka/aiokafka_consumer.py:190: in _fetch_one
    return message_to_record(message, offset=consumer_amount)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

message = <mockafka.message.Message object at 0x7eff37066850>, offset = 0

    def message_to_record(message: Message, offset: int) -> ConsumerRecord[bytes, bytes]:
        topic: Optional[str] = message.topic()
        partition: Optional[int] = message.partition()
        timestamp: Optional[int] = message.timestamp()
    
        if topic is None or partition is None or timestamp is None:
            fields = [
                ("topic", topic),
                ("partition", partition),
                ("timestamp", timestamp),
            ]
            missing = ", ".join(x for x, y in fields if y is None)
            raise ValueError(f"Message is missing key components: {missing}")
    
        key_str: Optional[str] = message.key()
        value_str: Optional[str] = message.value()
    
        key = key_str.encode() if key_str is not None else None
        value = value_str.encode() if value_str is not None else None
    
        return ConsumerRecord(
            topic=topic,
            partition=partition,
            offset=offset,
            timestamp=timestamp,
            # https://github.com/apache/kafka/blob/932759bd70ce646ced5298a2ad8db02c0cea3643/clients/src/main/java/org/apache/kafka/common/record/TimestampType.java#L25
            timestamp_type=0,  # CreateTime
            key=key,
            value=value,
            checksum=None,  # Deprecated, we won't support it
            serialized_key_size=len(key) if key else 0,
            serialized_value_size=len(value) if value else 0,
>           headers=tuple((message.headers() or {}).items()),
        )
E       AttributeError: 'list' object has no attribute 'items'

../../.venv/lib/python3.11/site-packages/mockafka/aiokafka/aiokafka_consumer.py:54: AttributeError

bezruc avatar Aug 22 '24 09:08 bezruc