aiokafka
aiokafka copied to clipboard
Consumer fails when consuming messages compressed with ZSTD at the broker level
Describe the bug When consuming messages from a broker where compression has been set at the broker level with ZSTD compression, the Consumer fails with an UnknownError. This does not happen when ZSTD is set at a producer level instead. This behaviour is observed both in the latest pypi release, as well as in the latest version from the master branch in this repository
Expected behaviour Consumer should be able to consume messages with ZSTD compression, when its set at a broker level, just the same as when its set at a producer level.
Environment (please complete the following information):
- aiokafka version: 0.7.2
- kafka-python version: 2.0.2
Reproducible example When setting up the Broker, add to the server.properties file
compression.type=zstd
On the contrary, when if compression.type is omitted in the server.properties file and instead you create a producer with zstd compression, it works fine.
Have you tried with code from master branch? It should have been fixed in https://github.com/aio-libs/aiokafka/pull/801
Yes, its happening with the latest version from the master branch. Just reconfirmed it. Here's the full log output.
2022-07-05 10:32:21,989 my_fancy_service.aio_kafka INFO Starting Kafka consumer for topic test.
2022-07-05 10:32:21,989 aiokafka DEBUG Attempting to bootstrap via node at localhost:9092
2022-07-05 10:32:21,991 aiokafka.conn DEBUG <AIOKafkaConnection host=localhost port=9092> Request 1: MetadataRequest_v0(topics=[])
2022-07-05 10:32:21,995 aiokafka.conn DEBUG <AIOKafkaConnection host=localhost port=9092> Response 1: MetadataResponse_v0(brokers=[(node_id=0, host='192.168.0.126', port=9092)], topics=[(error_code=0, topic='test', partitions=[(error_code=0, partition=0, leader=0, replicas=[0], isr=[0])]), (error_code=0, topic='__consumer_offsets', partitions=[(error_code=0, partition=0, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=10, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=20, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=40, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=30, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=9, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=11, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=31, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=39, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=13, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=18, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=22, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=8, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=32, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=43, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=29, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=34, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=1, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=6, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=41, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=27, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=48, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=5, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=15, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=35, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=25, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=46, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=26, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=36, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=44, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=16, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=37, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=17, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=45, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=3, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=24, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=38, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=33, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=23, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=28, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=2, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=12, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=19, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=14, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=4, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=47, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=49, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=42, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=7, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=21, leader=0, replicas=[0], isr=[0])])])
2022-07-05 10:32:21,996 aiokafka.cluster DEBUG Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 2, groups: 0)
2022-07-05 10:32:21,996 aiokafka.conn DEBUG Closing connection at localhost:9092
2022-07-05 10:32:21,996 aiokafka DEBUG Received cluster metadata: ClusterMetadata(brokers: 1, topics: 2, groups: 0)
2022-07-05 10:32:21,996 aiokafka DEBUG Initiating connection to node 0 at 192.168.0.126:9092
2022-07-05 10:32:21,997 aiokafka.conn DEBUG <AIOKafkaConnection host=192.168.0.126 port=9092> Request 1: ApiVersionRequest_v0()
2022-07-05 10:32:21,998 aiokafka.conn DEBUG <AIOKafkaConnection host=192.168.0.126 port=9092> Response 1: ApiVersionResponse_v0(error_code=0, api_versions=[(api_key=0, min_version=0, max_version=9), (api_key=1, min_version=0, max_version=13), (api_key=2, min_version=0, max_version=7), (api_key=3, min_version=0, max_version=12), (api_key=4, min_version=0, max_version=6), (api_key=5, min_version=0, max_version=3), (api_key=6, min_version=0, max_version=7), (api_key=7, min_version=0, max_version=3), (api_key=8, min_version=0, max_version=8), (api_key=9, min_version=0, max_version=8), (api_key=10, min_version=0, max_version=4), (api_key=11, min_version=0, max_version=9), (api_key=12, min_version=0, max_version=4), (api_key=13, min_version=0, max_version=5), (api_key=14, min_version=0, max_version=5), (api_key=15, min_version=0, max_version=5), (api_key=16, min_version=0, max_version=4), (api_key=17, min_version=0, max_version=1), (api_key=18, min_version=0, max_version=3), (api_key=19, min_version=0, max_version=7), (api_key=20, min_version=0, max_version=6), (api_key=21, min_version=0, max_version=2), (api_key=22, min_version=0, max_version=4), (api_key=23, min_version=0, max_version=4), (api_key=24, min_version=0, max_version=3), (api_key=25, min_version=0, max_version=3), (api_key=26, min_version=0, max_version=3), (api_key=27, min_version=0, max_version=1), (api_key=28, min_version=0, max_version=3), (api_key=29, min_version=0, max_version=2), (api_key=30, min_version=0, max_version=2), (api_key=31, min_version=0, max_version=2), (api_key=32, min_version=0, max_version=4), (api_key=33, min_version=0, max_version=2), (api_key=34, min_version=0, max_version=2), (api_key=35, min_version=0, max_version=3), (api_key=36, min_version=0, max_version=2), (api_key=37, min_version=0, max_version=3), (api_key=38, min_version=0, max_version=2), (api_key=39, min_version=0, max_version=2), (api_key=40, min_version=0, max_version=2), (api_key=41, min_version=0, max_version=2), (api_key=42, min_version=0, max_version=2), (api_key=43, min_version=0, max_version=2), (api_key=44, min_version=0, max_version=1), (api_key=45, min_version=0, max_version=0), (api_key=46, min_version=0, max_version=0), (api_key=47, min_version=0, max_version=0), (api_key=48, min_version=0, max_version=1), (api_key=49, min_version=0, max_version=1), (api_key=50, min_version=0, max_version=0), (api_key=51, min_version=0, max_version=0), (api_key=56, min_version=0, max_version=1), (api_key=57, min_version=0, max_version=0), (api_key=60, min_version=0, max_version=0), (api_key=61, min_version=0, max_version=0), (api_key=65, min_version=0, max_version=0), (api_key=66, min_version=0, max_version=0), (api_key=67, min_version=0, max_version=0)])
2022-07-05 10:32:21,999 aiokafka.conn DEBUG <AIOKafkaConnection host=192.168.0.126 port=9092> Request 2: MetadataRequest_v0(topics=[])
2022-07-05 10:32:22,001 aiokafka.conn DEBUG <AIOKafkaConnection host=192.168.0.126 port=9092> Response 2: MetadataResponse_v0(brokers=[(node_id=0, host='192.168.0.126', port=9092)], topics=[(error_code=0, topic='test', partitions=[(error_code=0, partition=0, leader=0, replicas=[0], isr=[0])]), (error_code=0, topic='__consumer_offsets', partitions=[(error_code=0, partition=0, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=10, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=20, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=40, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=30, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=9, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=11, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=31, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=39, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=13, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=18, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=22, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=8, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=32, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=43, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=29, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=34, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=1, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=6, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=41, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=27, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=48, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=5, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=15, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=35, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=25, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=46, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=26, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=36, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=44, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=16, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=37, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=17, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=45, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=3, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=24, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=38, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=33, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=23, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=28, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=2, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=12, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=19, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=14, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=4, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=47, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=49, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=42, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=7, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=21, leader=0, replicas=[0], isr=[0])])])
2022-07-05 10:32:22,002 aiokafka.conn DEBUG Closing connection at 192.168.0.126:9092
2022-07-05 10:32:22,002 aiokafka.consumer.group_coordinator INFO Metadata for topic has changed from {} to {'test': 1}.
2022-07-05 10:32:22,003 aiokafka.consumer.fetcher DEBUG Updating fetch positions for partitions [TopicPartition(topic='test', partition=0)]
2022-07-05 10:32:22,003 aiokafka DEBUG Initiating connection to node 0 at 192.168.0.126:9092
2022-07-05 10:32:22,004 aiokafka.consumer.fetcher DEBUG No committed offset found for TopicPartition(topic='test', partition=0)
2022-07-05 10:32:22,004 aiokafka.consumer.fetcher DEBUG Resetting offset for partition TopicPartition(topic='test', partition=0) using latest strategy.
2022-07-05 10:32:22,004 aiokafka DEBUG Initiating connection to node 0 at 192.168.0.126:9092
2022-07-05 10:32:22,005 aiokafka.conn DEBUG <AIOKafkaConnection host=192.168.0.126 port=9092> Request 1: ApiVersionRequest_v0()
2022-07-05 10:32:22,006 aiokafka.conn DEBUG <AIOKafkaConnection host=192.168.0.126 port=9092> Response 1: ApiVersionResponse_v0(error_code=0, api_versions=[(api_key=0, min_version=0, max_version=9), (api_key=1, min_version=0, max_version=13), (api_key=2, min_version=0, max_version=7), (api_key=3, min_version=0, max_version=12), (api_key=4, min_version=0, max_version=6), (api_key=5, min_version=0, max_version=3), (api_key=6, min_version=0, max_version=7), (api_key=7, min_version=0, max_version=3), (api_key=8, min_version=0, max_version=8), (api_key=9, min_version=0, max_version=8), (api_key=10, min_version=0, max_version=4), (api_key=11, min_version=0, max_version=9), (api_key=12, min_version=0, max_version=4), (api_key=13, min_version=0, max_version=5), (api_key=14, min_version=0, max_version=5), (api_key=15, min_version=0, max_version=5), (api_key=16, min_version=0, max_version=4), (api_key=17, min_version=0, max_version=1), (api_key=18, min_version=0, max_version=3), (api_key=19, min_version=0, max_version=7), (api_key=20, min_version=0, max_version=6), (api_key=21, min_version=0, max_version=2), (api_key=22, min_version=0, max_version=4), (api_key=23, min_version=0, max_version=4), (api_key=24, min_version=0, max_version=3), (api_key=25, min_version=0, max_version=3), (api_key=26, min_version=0, max_version=3), (api_key=27, min_version=0, max_version=1), (api_key=28, min_version=0, max_version=3), (api_key=29, min_version=0, max_version=2), (api_key=30, min_version=0, max_version=2), (api_key=31, min_version=0, max_version=2), (api_key=32, min_version=0, max_version=4), (api_key=33, min_version=0, max_version=2), (api_key=34, min_version=0, max_version=2), (api_key=35, min_version=0, max_version=3), (api_key=36, min_version=0, max_version=2), (api_key=37, min_version=0, max_version=3), (api_key=38, min_version=0, max_version=2), (api_key=39, min_version=0, max_version=2), (api_key=40, min_version=0, max_version=2), (api_key=41, min_version=0, max_version=2), (api_key=42, min_version=0, max_version=2), (api_key=43, min_version=0, max_version=2), (api_key=44, min_version=0, max_version=1), (api_key=45, min_version=0, max_version=0), (api_key=46, min_version=0, max_version=0), (api_key=47, min_version=0, max_version=0), (api_key=48, min_version=0, max_version=1), (api_key=49, min_version=0, max_version=1), (api_key=50, min_version=0, max_version=0), (api_key=51, min_version=0, max_version=0), (api_key=56, min_version=0, max_version=1), (api_key=57, min_version=0, max_version=0), (api_key=60, min_version=0, max_version=0), (api_key=61, min_version=0, max_version=0), (api_key=65, min_version=0, max_version=0), (api_key=66, min_version=0, max_version=0), (api_key=67, min_version=0, max_version=0)])
2022-07-05 10:32:22,006 aiokafka DEBUG Sending metadata request MetadataRequest_v1(topics=['test']) to node 0
2022-07-05 10:32:22,006 aiokafka.conn DEBUG <AIOKafkaConnection host=192.168.0.126 port=9092> Request 2: MetadataRequest_v1(topics=['test'])
2022-07-05 10:32:22,007 aiokafka.conn DEBUG <AIOKafkaConnection host=192.168.0.126 port=9092> Request 3: OffsetRequest_v1(replica_id=-1, topics=[(topic='test', partitions=[(partition=0, timestamp=-1)])])
2022-07-05 10:32:22,007 aiokafka.conn DEBUG <AIOKafkaConnection host=192.168.0.126 port=9092> Response 2: MetadataResponse_v1(brokers=[(node_id=0, host='192.168.0.126', port=9092, rack=None)], controller_id=0, topics=[(error_code=0, topic='test', is_internal=False, partitions=[(error_code=0, partition=0, leader=0, replicas=[0], isr=[0])])])
2022-07-05 10:32:22,007 aiokafka.cluster DEBUG Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 1, groups: 0)
2022-07-05 10:32:22,013 aiokafka.conn DEBUG <AIOKafkaConnection host=192.168.0.126 port=9092> Response 3: OffsetResponse_v1(topics=[(topic='test', partitions=[(partition=0, error_code=0, timestamp=-1, offset=323)])])
2022-07-05 10:32:22,013 aiokafka.consumer.fetcher DEBUG Handling ListOffsetResponse response for TopicPartition(topic='test', partition=0). Fetched offset 323, timestamp -1
2022-07-05 10:32:22,013 aiokafka.consumer.fetcher DEBUG Adding fetch request for partition TopicPartition(topic='test', partition=0) at offset 323
2022-07-05 10:32:22,013 aiokafka.conn DEBUG <AIOKafkaConnection host=192.168.0.126 port=9092> Request 4: FetchRequest_v4(replica_id=-1, max_wait_time=10000, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='test', partitions=[(partition=0, offset=323, max_bytes=1048576)])])
2022-07-05 10:32:32,018 aiokafka.conn DEBUG <AIOKafkaConnection host=192.168.0.126 port=9092> Response 4: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='test', partitions=[(partition=0, error_code=76, highwater_offset=-1, last_stable_offset=-1, aborted_transactions=[], message_set=None)])])
2022-07-05 10:32:32,018 aiokafka.consumer.fetcher WARNING Unexpected error while fetching data: UnknownError
2022-07-05 10:32:32,019 aiokafka.consumer.fetcher DEBUG Adding fetch request for partition TopicPartition(topic='test', partition=0) at offset 323
2022-07-05 10:32:32,020 aiokafka.conn DEBUG <AIOKafkaConnection host=192.168.0.126 port=9092> Request 5: FetchRequest_v4(replica_id=-1, max_wait_time=10000, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='test', partitions=[(partition=0, offset=323, max_bytes=1048576)])])
2022-07-05 10:32:42,023 aiokafka.conn DEBUG <AIOKafkaConnection host=192.168.0.126 port=9092> Response 5: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='test', partitions=[(partition=0, error_code=76, highwater_offset=-1, last_stable_offset=-1, aborted_transactions=[], message_set=None)])])
2022-07-05 10:32:42,023 aiokafka.consumer.fetcher WARNING Unexpected error while fetching data: UnknownError
2022-07-05 10:32:42,024 aiokafka.consumer.fetcher DEBUG Adding fetch request for partition TopicPartition(topic='test', partition=0) at offset 323
2022-07-05 10:32:42,024 aiokafka.conn DEBUG <AIOKafkaConnection host=192.168.0.126 port=9092> Request 6: FetchRequest_v4(replica_id=-1, max_wait_time=10000, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='test', partitions=[(partition=0, offset=323, max_bytes=1048576)])])
2022-07-05 10:32:52,027 aiokafka.conn DEBUG <AIOKafkaConnection host=192.168.0.126 port=9092> Response 6: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='test', partitions=[(partition=0, error_code=76, highwater_offset=-1, last_stable_offset=-1, aborted_transactions=[], message_set=None)])])
2022-07-05 10:32:52,027 aiokafka.consumer.fetcher WARNING Unexpected error while fetching data: UnknownError
2022-07-05 10:32:52,028 aiokafka.consumer.fetcher DEBUG Adding fetch request for partition TopicPartition(topic='test', partition=0) at offset 323
2022-07-05 10:32:52,029 aiokafka.conn DEBUG <AIOKafkaConnection host=192.168.0.126 port=9092> Request 7: FetchRequest_v4(replica_id=-1, max_wait_time=10000, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='test', partitions=[(partition=0, offset=323, max_bytes=1048576)])])
2022-07-05 10:33:02,033 aiokafka.conn DEBUG <AIOKafkaConnection host=192.168.0.126 port=9092> Response 7: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='test', partitions=[(partition=0, error_code=76, highwater_offset=-1, last_stable_offset=-1, aborted_transactions=[], message_set=None)])])
2022-07-05 10:33:02,033 aiokafka.consumer.fetcher WARNING Unexpected error while fetching data: UnknownError
2022-07-05 10:33:02,034 aiokafka.consumer.fetcher DEBUG Adding fetch request for partition TopicPartition(topic='test', partition=0) at offset 323
2022-07-05 10:33:02,034 aiokafka.conn DEBUG <AIOKafkaConnection host=192.168.0.126 port=9092> Request 8: FetchRequest_v4(replica_id=-1, max_wait_time=10000, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='test', partitions=[(partition=0, offset=323, max_bytes=1048576)])])
Do we have an update on this?
Hey guys! I'm just joining the club with the same issue. Any updates?
Is anyone else working on this because I am trying to integrate this into faust but if it's not added here. I will work on first adding support here and then integrating it into faust later
Hey, so the zstd error comes because of a fetch request versioning error. Kafka python and by extension this package sends fetch request version 4 and ZSTD support requires version 10 at least. this requires the need for fetch sessions,removed_topics and log_start_offset,current_leader_epoch to be added to the fetch request. I already tested to see if this issue becomes resolved when upgrading the version with default values for those parameters. It does work. I plan to add these features to kafka-python to put into a pull request asap. However, in aiokafka's case I just wanted to check in to see if fetch sessions will interfere the asynchronous nature of this module. this KIP has information on fetch sessions. I plan to see for myself if it will interfere anyway but it would be much more efficient if someone can do the check for me.
getting below error when trying to consume from a ZSTD compressed record from kafka.
raise UnsupportedCodecError( kafka.errors.UnsupportedCodecError: UnsupportedCodecError: Libraries for zstd compression codec not found
error log:
Traceback (most recent call last): File "/home/PycharmProjects/myPythonLearning/kafka/consumer/consumer.py", line 25, in consume_from_topic(topic_to_consume) File "/home/PycharmProjects/myPythonLearning/kafka/consumer/consumer.py", line 14, in consume_from_topic for message in consumer: File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/group.py", line 1193, in next return self.next_v2() File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/group.py", line 1201, in next_v2 return next(self._iterator) File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/group.py", line 1116, in _message_generator_v2 record_map = self.poll(timeout_ms=timeout_ms, update_offsets=False) File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/group.py", line 655, in poll records = self._poll_once(remaining, max_records, update_offsets=update_offsets) File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/group.py", line 708, in _poll_once records, _ = self._fetcher.fetched_records(max_records, update_offsets=update_offsets) File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/fetcher.py", line 344, in fetched_records self._next_partition_records = self._parse_fetched_data(completion) File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/fetcher.py", line 818, in _parse_fetched_data unpacked = list(self._unpack_message_set(tp, records)) File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/fetcher.py", line 467, in _unpack_message_set for record in batch: File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/record/default_records.py", line 276, in iter self._maybe_uncompress() File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/record/default_records.py", line 183, in _maybe_uncompress self._assert_has_codec(compression_type) File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/record/default_records.py", line 118, in _assert_has_codec raise UnsupportedCodecError( kafka.errors.UnsupportedCodecError: UnsupportedCodecError: Libraries for zstd compression codec not found
Process finished with exit code 1
My Code for consuming a kafka topic:
from kafka import KafkaConsumer def consume_from_topic(topic): consumer = KafkaConsumer( topic, bootstrap_servers='localhost:9092', group_id='zstd-11-consumer-group', auto_offset_reset='earliest', enable_auto_commit=True ) try: for message in consumer: v = message.value k = message.key.decode("utf-8") log = "key={}, offset={}, partition={}, value={}".format(k, message.offset, message.partition, v) print(log)
except KeyboardInterrupt: consumer.close() if name == "main": topic_to_consume = "Integrate-Package-Zstd-ESP.info" consume_from_topic(topic_to_consume)
getting below error when trying to consume from a ZSTD compressed record from kafka.
raise UnsupportedCodecError( kafka.errors.UnsupportedCodecError: UnsupportedCodecError: Libraries for zstd compression codec not found
Did you specify extra when installing aiokafka? Either aiokafka[zstd]
of aiokafka[all]
should be used to support ZStandard codec.
@theultimate1 I hit the same problem. Your approach to update Fetch to v10 is spot on. Wondering if you could find time to make a PR soon? Many thanks!
I will try to get some actual work done for this. been busy lol. This is definitely on my todo.
@theultimate1 - if I can lend a hand let me know (but you'll need to brief me on what needs doing/what you've discovered). This is currently a slight blocker for ideal operation on my side.
yup I am cleaning up right now. will put in a PR by this week