aiokafka icon indicating copy to clipboard operation
aiokafka copied to clipboard

UnknownError while consuming from a zstd-compressed topic - broker returns error code 76 (UNSUPPORTED_COMPRESSION_TYPE)

Open mesteruh opened this issue 5 months ago • 0 comments

Describe the bug When a topic is configured with compression.type=zstd, aiokafka consumers crash on the first fetch with: Unexpected error while fetching data: UnknownError

Debug logging reveals that the broker returns error code 76 (UNSUPPORTED_COMPRESSION_TYPE).
The same consumer works fine if the topic is switched back to compression.type=producer, even though the producer continues to publish zstd-compressed batches.

Root cause: aiokafka still sends FetchRequest v4, which does not advertise zstd support.
The broker therefore rejects the response whenever it knows it must serve zstd (topic-level compression.type=zstd).
If the broker is not forced to recompress (compression.type=producer), it happily streams the original zstd batches back, so the error does not appear.

Expected behaviour If zstandard support is available (via pip install aiokafka[zstd]) and the broker supports zstd (Kafka ≥ 2.1.0), the consumer should:

  1. Automatically use FetchRequest v10+ when it detects it might receive zstd

Environment (please complete the following information):

  • aiokafka version 0.12.0
  • Kafka Broker version (kafka-topics.sh --version):
  • Other information (Confluent Cloud version, etc.):

Reproducible example

import asyncio
from aiokafka import AIOKafkaProducer

async def send_messages():
    producer = AIOKafkaProducer(
        bootstrap_servers='localhost:29092', 
        compression_type='zstd',

    )
    await producer.start()
    try:
        # Отправка 10 сообщений
        for i in range(10):
            message = f"Сообщение {i}".encode('utf-8')
            await producer.send_and_wait("axixa2", message)
            print(f"Отправлено: {message.decode('utf-8')}")
            await asyncio.sleep(1)  # Пауза для наглядности
    finally:
        await producer.stop()

if __name__ == "__main__":
    asyncio.run(send_messages())
import asyncio
from aiokafka import AIOKafkaConsumer

async def consume_messages():
    consumer = AIOKafkaConsumer(
        'axixa2',
        bootstrap_servers='localhost:29092',
        group_id='my-group',
        auto_offset_reset='earliest'
    )
    await consumer.start()
    try:
        async for msg in consumer:
            print(f"Получено: {msg.value.decode('utf-8')} (партиция: {msg.partition})")
    finally:
        await consumer.stop()

if __name__ == "__main__":
    asyncio.run(consume_messages())
version: '3.7'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.4.2
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"
    networks:
      kafka-net:
        ipv4_address: 172.25.0.2

  kafka:
    image: confluentinc/cp-kafka:7.4.2
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,PLAINTEXT_HOST://0.0.0.0:29092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092  # Изменено на localhost
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_COMPRESSION_TYPE: zstd
    networks:
      kafka-net:
        ipv4_address: 172.25.0.3
    healthcheck:
      test: ["CMD", "nc", "-z", "localhost", "9092"]
      interval: 5s
      timeout: 10s
      retries: 10

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    container_name: kafka-ui
    depends_on:
      - kafka
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
    networks:
      kafka-net:
        ipv4_address: 172.25.0.4

  producer:
    image: confluentinc/cp-kafka:7.4.2
    container_name: producer
    depends_on:
      - kafka
    command: >
      bash -c "
        while ! nc -z kafka 9092; do sleep 1; done;
        kafka-topics --bootstrap-server kafka:9092 --create --topic zstd-test --partitions 3 --replication-factor 1 --if-not-exists;
        seq 1000 | kafka-console-producer --bootstrap-server kafka:9092 --topic zstd-test --compression-codec zstd;
        sleep infinity
      "
    networks:
      kafka-net:
        ipv4_address: 172.25.0.5

networks:
  kafka-net:
    driver: bridge
    ipam:
      config:
        - subnet: 172.25.0.0/24

I opened mr

mesteruh avatar May 31 '25 22:05 mesteruh