aiokafka icon indicating copy to clipboard operation
aiokafka copied to clipboard

Consumer fails when consuming messages compressed with ZSTD at the broker level

Open DeLaboreMercurio opened this issue 2 years ago • 12 comments

Describe the bug When consuming messages from a broker where compression has been set at the broker level with ZSTD compression, the Consumer fails with an UnknownError. This does not happen when ZSTD is set at a producer level instead. This behaviour is observed both in the latest pypi release, as well as in the latest version from the master branch in this repository

Expected behaviour Consumer should be able to consume messages with ZSTD compression, when its set at a broker level, just the same as when its set at a producer level.

Environment (please complete the following information):

  • aiokafka version: 0.7.2
  • kafka-python version: 2.0.2

Reproducible example When setting up the Broker, add to the server.properties file

compression.type=zstd

On the contrary, when if compression.type is omitted in the server.properties file and instead you create a producer with zstd compression, it works fine.

DeLaboreMercurio avatar Jul 04 '22 14:07 DeLaboreMercurio

Have you tried with code from master branch? It should have been fixed in https://github.com/aio-libs/aiokafka/pull/801

ods avatar Jul 05 '22 07:07 ods

Yes, its happening with the latest version from the master branch. Just reconfirmed it. Here's the full log output.

2022-07-05 10:32:21,989 my_fancy_service.aio_kafka INFO     Starting Kafka consumer for topic test.
2022-07-05 10:32:21,989 aiokafka     DEBUG    Attempting to bootstrap via node at localhost:9092
2022-07-05 10:32:21,991 aiokafka.conn DEBUG    <AIOKafkaConnection host=localhost port=9092> Request 1: MetadataRequest_v0(topics=[])
2022-07-05 10:32:21,995 aiokafka.conn DEBUG    <AIOKafkaConnection host=localhost port=9092> Response 1: MetadataResponse_v0(brokers=[(node_id=0, host='192.168.0.126', port=9092)], topics=[(error_code=0, topic='test', partitions=[(error_code=0, partition=0, leader=0, replicas=[0], isr=[0])]), (error_code=0, topic='__consumer_offsets', partitions=[(error_code=0, partition=0, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=10, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=20, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=40, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=30, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=9, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=11, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=31, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=39, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=13, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=18, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=22, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=8, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=32, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=43, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=29, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=34, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=1, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=6, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=41, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=27, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=48, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=5, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=15, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=35, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=25, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=46, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=26, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=36, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=44, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=16, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=37, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=17, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=45, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=3, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=24, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=38, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=33, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=23, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=28, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=2, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=12, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=19, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=14, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=4, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=47, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=49, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=42, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=7, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=21, leader=0, replicas=[0], isr=[0])])])
2022-07-05 10:32:21,996 aiokafka.cluster DEBUG    Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 2, groups: 0)
2022-07-05 10:32:21,996 aiokafka.conn DEBUG    Closing connection at localhost:9092
2022-07-05 10:32:21,996 aiokafka     DEBUG    Received cluster metadata: ClusterMetadata(brokers: 1, topics: 2, groups: 0)
2022-07-05 10:32:21,996 aiokafka     DEBUG    Initiating connection to node 0 at 192.168.0.126:9092
2022-07-05 10:32:21,997 aiokafka.conn DEBUG    <AIOKafkaConnection host=192.168.0.126 port=9092> Request 1: ApiVersionRequest_v0()
2022-07-05 10:32:21,998 aiokafka.conn DEBUG    <AIOKafkaConnection host=192.168.0.126 port=9092> Response 1: ApiVersionResponse_v0(error_code=0, api_versions=[(api_key=0, min_version=0, max_version=9), (api_key=1, min_version=0, max_version=13), (api_key=2, min_version=0, max_version=7), (api_key=3, min_version=0, max_version=12), (api_key=4, min_version=0, max_version=6), (api_key=5, min_version=0, max_version=3), (api_key=6, min_version=0, max_version=7), (api_key=7, min_version=0, max_version=3), (api_key=8, min_version=0, max_version=8), (api_key=9, min_version=0, max_version=8), (api_key=10, min_version=0, max_version=4), (api_key=11, min_version=0, max_version=9), (api_key=12, min_version=0, max_version=4), (api_key=13, min_version=0, max_version=5), (api_key=14, min_version=0, max_version=5), (api_key=15, min_version=0, max_version=5), (api_key=16, min_version=0, max_version=4), (api_key=17, min_version=0, max_version=1), (api_key=18, min_version=0, max_version=3), (api_key=19, min_version=0, max_version=7), (api_key=20, min_version=0, max_version=6), (api_key=21, min_version=0, max_version=2), (api_key=22, min_version=0, max_version=4), (api_key=23, min_version=0, max_version=4), (api_key=24, min_version=0, max_version=3), (api_key=25, min_version=0, max_version=3), (api_key=26, min_version=0, max_version=3), (api_key=27, min_version=0, max_version=1), (api_key=28, min_version=0, max_version=3), (api_key=29, min_version=0, max_version=2), (api_key=30, min_version=0, max_version=2), (api_key=31, min_version=0, max_version=2), (api_key=32, min_version=0, max_version=4), (api_key=33, min_version=0, max_version=2), (api_key=34, min_version=0, max_version=2), (api_key=35, min_version=0, max_version=3), (api_key=36, min_version=0, max_version=2), (api_key=37, min_version=0, max_version=3), (api_key=38, min_version=0, max_version=2), (api_key=39, min_version=0, max_version=2), (api_key=40, min_version=0, max_version=2), (api_key=41, min_version=0, max_version=2), (api_key=42, min_version=0, max_version=2), (api_key=43, min_version=0, max_version=2), (api_key=44, min_version=0, max_version=1), (api_key=45, min_version=0, max_version=0), (api_key=46, min_version=0, max_version=0), (api_key=47, min_version=0, max_version=0), (api_key=48, min_version=0, max_version=1), (api_key=49, min_version=0, max_version=1), (api_key=50, min_version=0, max_version=0), (api_key=51, min_version=0, max_version=0), (api_key=56, min_version=0, max_version=1), (api_key=57, min_version=0, max_version=0), (api_key=60, min_version=0, max_version=0), (api_key=61, min_version=0, max_version=0), (api_key=65, min_version=0, max_version=0), (api_key=66, min_version=0, max_version=0), (api_key=67, min_version=0, max_version=0)])
2022-07-05 10:32:21,999 aiokafka.conn DEBUG    <AIOKafkaConnection host=192.168.0.126 port=9092> Request 2: MetadataRequest_v0(topics=[])
2022-07-05 10:32:22,001 aiokafka.conn DEBUG    <AIOKafkaConnection host=192.168.0.126 port=9092> Response 2: MetadataResponse_v0(brokers=[(node_id=0, host='192.168.0.126', port=9092)], topics=[(error_code=0, topic='test', partitions=[(error_code=0, partition=0, leader=0, replicas=[0], isr=[0])]), (error_code=0, topic='__consumer_offsets', partitions=[(error_code=0, partition=0, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=10, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=20, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=40, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=30, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=9, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=11, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=31, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=39, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=13, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=18, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=22, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=8, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=32, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=43, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=29, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=34, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=1, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=6, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=41, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=27, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=48, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=5, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=15, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=35, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=25, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=46, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=26, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=36, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=44, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=16, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=37, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=17, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=45, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=3, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=24, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=38, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=33, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=23, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=28, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=2, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=12, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=19, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=14, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=4, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=47, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=49, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=42, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=7, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=21, leader=0, replicas=[0], isr=[0])])])
2022-07-05 10:32:22,002 aiokafka.conn DEBUG    Closing connection at 192.168.0.126:9092
2022-07-05 10:32:22,002 aiokafka.consumer.group_coordinator INFO     Metadata for topic has changed from {} to {'test': 1}. 
2022-07-05 10:32:22,003 aiokafka.consumer.fetcher DEBUG    Updating fetch positions for partitions [TopicPartition(topic='test', partition=0)]
2022-07-05 10:32:22,003 aiokafka     DEBUG    Initiating connection to node 0 at 192.168.0.126:9092
2022-07-05 10:32:22,004 aiokafka.consumer.fetcher DEBUG    No committed offset found for TopicPartition(topic='test', partition=0)
2022-07-05 10:32:22,004 aiokafka.consumer.fetcher DEBUG    Resetting offset for partition TopicPartition(topic='test', partition=0) using latest strategy.
2022-07-05 10:32:22,004 aiokafka     DEBUG    Initiating connection to node 0 at 192.168.0.126:9092
2022-07-05 10:32:22,005 aiokafka.conn DEBUG    <AIOKafkaConnection host=192.168.0.126 port=9092> Request 1: ApiVersionRequest_v0()
2022-07-05 10:32:22,006 aiokafka.conn DEBUG    <AIOKafkaConnection host=192.168.0.126 port=9092> Response 1: ApiVersionResponse_v0(error_code=0, api_versions=[(api_key=0, min_version=0, max_version=9), (api_key=1, min_version=0, max_version=13), (api_key=2, min_version=0, max_version=7), (api_key=3, min_version=0, max_version=12), (api_key=4, min_version=0, max_version=6), (api_key=5, min_version=0, max_version=3), (api_key=6, min_version=0, max_version=7), (api_key=7, min_version=0, max_version=3), (api_key=8, min_version=0, max_version=8), (api_key=9, min_version=0, max_version=8), (api_key=10, min_version=0, max_version=4), (api_key=11, min_version=0, max_version=9), (api_key=12, min_version=0, max_version=4), (api_key=13, min_version=0, max_version=5), (api_key=14, min_version=0, max_version=5), (api_key=15, min_version=0, max_version=5), (api_key=16, min_version=0, max_version=4), (api_key=17, min_version=0, max_version=1), (api_key=18, min_version=0, max_version=3), (api_key=19, min_version=0, max_version=7), (api_key=20, min_version=0, max_version=6), (api_key=21, min_version=0, max_version=2), (api_key=22, min_version=0, max_version=4), (api_key=23, min_version=0, max_version=4), (api_key=24, min_version=0, max_version=3), (api_key=25, min_version=0, max_version=3), (api_key=26, min_version=0, max_version=3), (api_key=27, min_version=0, max_version=1), (api_key=28, min_version=0, max_version=3), (api_key=29, min_version=0, max_version=2), (api_key=30, min_version=0, max_version=2), (api_key=31, min_version=0, max_version=2), (api_key=32, min_version=0, max_version=4), (api_key=33, min_version=0, max_version=2), (api_key=34, min_version=0, max_version=2), (api_key=35, min_version=0, max_version=3), (api_key=36, min_version=0, max_version=2), (api_key=37, min_version=0, max_version=3), (api_key=38, min_version=0, max_version=2), (api_key=39, min_version=0, max_version=2), (api_key=40, min_version=0, max_version=2), (api_key=41, min_version=0, max_version=2), (api_key=42, min_version=0, max_version=2), (api_key=43, min_version=0, max_version=2), (api_key=44, min_version=0, max_version=1), (api_key=45, min_version=0, max_version=0), (api_key=46, min_version=0, max_version=0), (api_key=47, min_version=0, max_version=0), (api_key=48, min_version=0, max_version=1), (api_key=49, min_version=0, max_version=1), (api_key=50, min_version=0, max_version=0), (api_key=51, min_version=0, max_version=0), (api_key=56, min_version=0, max_version=1), (api_key=57, min_version=0, max_version=0), (api_key=60, min_version=0, max_version=0), (api_key=61, min_version=0, max_version=0), (api_key=65, min_version=0, max_version=0), (api_key=66, min_version=0, max_version=0), (api_key=67, min_version=0, max_version=0)])
2022-07-05 10:32:22,006 aiokafka     DEBUG    Sending metadata request MetadataRequest_v1(topics=['test']) to node 0
2022-07-05 10:32:22,006 aiokafka.conn DEBUG    <AIOKafkaConnection host=192.168.0.126 port=9092> Request 2: MetadataRequest_v1(topics=['test'])
2022-07-05 10:32:22,007 aiokafka.conn DEBUG    <AIOKafkaConnection host=192.168.0.126 port=9092> Request 3: OffsetRequest_v1(replica_id=-1, topics=[(topic='test', partitions=[(partition=0, timestamp=-1)])])
2022-07-05 10:32:22,007 aiokafka.conn DEBUG    <AIOKafkaConnection host=192.168.0.126 port=9092> Response 2: MetadataResponse_v1(brokers=[(node_id=0, host='192.168.0.126', port=9092, rack=None)], controller_id=0, topics=[(error_code=0, topic='test', is_internal=False, partitions=[(error_code=0, partition=0, leader=0, replicas=[0], isr=[0])])])
2022-07-05 10:32:22,007 aiokafka.cluster DEBUG    Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 1, groups: 0)
2022-07-05 10:32:22,013 aiokafka.conn DEBUG    <AIOKafkaConnection host=192.168.0.126 port=9092> Response 3: OffsetResponse_v1(topics=[(topic='test', partitions=[(partition=0, error_code=0, timestamp=-1, offset=323)])])
2022-07-05 10:32:22,013 aiokafka.consumer.fetcher DEBUG    Handling ListOffsetResponse response for TopicPartition(topic='test', partition=0). Fetched offset 323, timestamp -1
2022-07-05 10:32:22,013 aiokafka.consumer.fetcher DEBUG    Adding fetch request for partition TopicPartition(topic='test', partition=0) at offset 323
2022-07-05 10:32:22,013 aiokafka.conn DEBUG    <AIOKafkaConnection host=192.168.0.126 port=9092> Request 4: FetchRequest_v4(replica_id=-1, max_wait_time=10000, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='test', partitions=[(partition=0, offset=323, max_bytes=1048576)])])
2022-07-05 10:32:32,018 aiokafka.conn DEBUG    <AIOKafkaConnection host=192.168.0.126 port=9092> Response 4: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='test', partitions=[(partition=0, error_code=76, highwater_offset=-1, last_stable_offset=-1, aborted_transactions=[], message_set=None)])])
2022-07-05 10:32:32,018 aiokafka.consumer.fetcher WARNING  Unexpected error while fetching data: UnknownError
2022-07-05 10:32:32,019 aiokafka.consumer.fetcher DEBUG    Adding fetch request for partition TopicPartition(topic='test', partition=0) at offset 323
2022-07-05 10:32:32,020 aiokafka.conn DEBUG    <AIOKafkaConnection host=192.168.0.126 port=9092> Request 5: FetchRequest_v4(replica_id=-1, max_wait_time=10000, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='test', partitions=[(partition=0, offset=323, max_bytes=1048576)])])
2022-07-05 10:32:42,023 aiokafka.conn DEBUG    <AIOKafkaConnection host=192.168.0.126 port=9092> Response 5: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='test', partitions=[(partition=0, error_code=76, highwater_offset=-1, last_stable_offset=-1, aborted_transactions=[], message_set=None)])])
2022-07-05 10:32:42,023 aiokafka.consumer.fetcher WARNING  Unexpected error while fetching data: UnknownError
2022-07-05 10:32:42,024 aiokafka.consumer.fetcher DEBUG    Adding fetch request for partition TopicPartition(topic='test', partition=0) at offset 323
2022-07-05 10:32:42,024 aiokafka.conn DEBUG    <AIOKafkaConnection host=192.168.0.126 port=9092> Request 6: FetchRequest_v4(replica_id=-1, max_wait_time=10000, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='test', partitions=[(partition=0, offset=323, max_bytes=1048576)])])
2022-07-05 10:32:52,027 aiokafka.conn DEBUG    <AIOKafkaConnection host=192.168.0.126 port=9092> Response 6: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='test', partitions=[(partition=0, error_code=76, highwater_offset=-1, last_stable_offset=-1, aborted_transactions=[], message_set=None)])])
2022-07-05 10:32:52,027 aiokafka.consumer.fetcher WARNING  Unexpected error while fetching data: UnknownError
2022-07-05 10:32:52,028 aiokafka.consumer.fetcher DEBUG    Adding fetch request for partition TopicPartition(topic='test', partition=0) at offset 323
2022-07-05 10:32:52,029 aiokafka.conn DEBUG    <AIOKafkaConnection host=192.168.0.126 port=9092> Request 7: FetchRequest_v4(replica_id=-1, max_wait_time=10000, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='test', partitions=[(partition=0, offset=323, max_bytes=1048576)])])
2022-07-05 10:33:02,033 aiokafka.conn DEBUG    <AIOKafkaConnection host=192.168.0.126 port=9092> Response 7: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='test', partitions=[(partition=0, error_code=76, highwater_offset=-1, last_stable_offset=-1, aborted_transactions=[], message_set=None)])])
2022-07-05 10:33:02,033 aiokafka.consumer.fetcher WARNING  Unexpected error while fetching data: UnknownError
2022-07-05 10:33:02,034 aiokafka.consumer.fetcher DEBUG    Adding fetch request for partition TopicPartition(topic='test', partition=0) at offset 323
2022-07-05 10:33:02,034 aiokafka.conn DEBUG    <AIOKafkaConnection host=192.168.0.126 port=9092> Request 8: FetchRequest_v4(replica_id=-1, max_wait_time=10000, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='test', partitions=[(partition=0, offset=323, max_bytes=1048576)])])

SebasYanik avatar Jul 05 '22 13:07 SebasYanik

Do we have an update on this?

DeLaboreMercurio avatar Jul 27 '22 16:07 DeLaboreMercurio

Hey guys! I'm just joining the club with the same issue. Any updates?

bigEvilBanana avatar Sep 01 '22 15:09 bigEvilBanana

Is anyone else working on this because I am trying to integrate this into faust but if it's not added here. I will work on first adding support here and then integrating it into faust later

theultimate1 avatar Oct 21 '22 07:10 theultimate1

Hey, so the zstd error comes because of a fetch request versioning error. Kafka python and by extension this package sends fetch request version 4 and ZSTD support requires version 10 at least. this requires the need for fetch sessions,removed_topics and log_start_offset,current_leader_epoch to be added to the fetch request. I already tested to see if this issue becomes resolved when upgrading the version with default values for those parameters. It does work. I plan to add these features to kafka-python to put into a pull request asap. However, in aiokafka's case I just wanted to check in to see if fetch sessions will interfere the asynchronous nature of this module. this KIP has information on fetch sessions. I plan to see for myself if it will interfere anyway but it would be much more efficient if someone can do the check for me.

theultimate1 avatar Dec 07 '22 04:12 theultimate1

getting below error when trying to consume from a ZSTD compressed record from kafka.

raise UnsupportedCodecError( kafka.errors.UnsupportedCodecError: UnsupportedCodecError: Libraries for zstd compression codec not found

error log:

Traceback (most recent call last): File "/home/PycharmProjects/myPythonLearning/kafka/consumer/consumer.py", line 25, in consume_from_topic(topic_to_consume) File "/home/PycharmProjects/myPythonLearning/kafka/consumer/consumer.py", line 14, in consume_from_topic for message in consumer: File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/group.py", line 1193, in next return self.next_v2() File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/group.py", line 1201, in next_v2 return next(self._iterator) File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/group.py", line 1116, in _message_generator_v2 record_map = self.poll(timeout_ms=timeout_ms, update_offsets=False) File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/group.py", line 655, in poll records = self._poll_once(remaining, max_records, update_offsets=update_offsets) File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/group.py", line 708, in _poll_once records, _ = self._fetcher.fetched_records(max_records, update_offsets=update_offsets) File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/fetcher.py", line 344, in fetched_records self._next_partition_records = self._parse_fetched_data(completion) File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/fetcher.py", line 818, in _parse_fetched_data unpacked = list(self._unpack_message_set(tp, records)) File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/fetcher.py", line 467, in _unpack_message_set for record in batch: File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/record/default_records.py", line 276, in iter self._maybe_uncompress() File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/record/default_records.py", line 183, in _maybe_uncompress self._assert_has_codec(compression_type) File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/record/default_records.py", line 118, in _assert_has_codec raise UnsupportedCodecError( kafka.errors.UnsupportedCodecError: UnsupportedCodecError: Libraries for zstd compression codec not found

Process finished with exit code 1

My Code for consuming a kafka topic:

from kafka import KafkaConsumer def consume_from_topic(topic): consumer = KafkaConsumer( topic, bootstrap_servers='localhost:9092', group_id='zstd-11-consumer-group', auto_offset_reset='earliest', enable_auto_commit=True ) try: for message in consumer: v = message.value k = message.key.decode("utf-8") log = "key={}, offset={}, partition={}, value={}".format(k, message.offset, message.partition, v) print(log)

except KeyboardInterrupt: consumer.close() if name == "main": topic_to_consume = "Integrate-Package-Zstd-ESP.info" consume_from_topic(topic_to_consume)

Rohit-Singh3 avatar Jul 29 '23 13:07 Rohit-Singh3

getting below error when trying to consume from a ZSTD compressed record from kafka.

raise UnsupportedCodecError( kafka.errors.UnsupportedCodecError: UnsupportedCodecError: Libraries for zstd compression codec not found

Did you specify extra when installing aiokafka? Either aiokafka[zstd] of aiokafka[all] should be used to support ZStandard codec.

ods avatar Aug 06 '23 11:08 ods

@theultimate1 I hit the same problem. Your approach to update Fetch to v10 is spot on. Wondering if you could find time to make a PR soon? Many thanks!

bcwalrus avatar Oct 09 '23 03:10 bcwalrus

I will try to get some actual work done for this. been busy lol. This is definitely on my todo.

theultimate1 avatar Oct 09 '23 03:10 theultimate1

@theultimate1 - if I can lend a hand let me know (but you'll need to brief me on what needs doing/what you've discovered). This is currently a slight blocker for ideal operation on my side.

davidvanwyk avatar Nov 29 '23 12:11 davidvanwyk

yup I am cleaning up right now. will put in a PR by this week

theultimate1 avatar Mar 19 '24 03:03 theultimate1