Add support for `zstd` compression
It'd be really nice to add support for zstd compression now that this is an officially supported codec (KIP-110).
Looks like there are two available python libraries for zstd compression. Would be really nice to refactor the codec compression interfaces into something more generic, allowing users to wrap their favorite library -- rather than us having to pick a favorite to integrate with directly.
Would be really nice to refactor the codec compression interfaces into something more generic, allowing users to wrap their favorite library -- rather than us having to pick a favorite to integrate with directly.
Agreed. However, there is still protocol-level considerations since the producer has to signal to the broker what compression format was used (which it, in turn, passes to downstream consumers).
What would be a good interface?
I'm thinking the consumer/producer can pass in a custom class. And that class has four pieces:
compress()decompress()- Class-level var indicating which protocol flag to set... allowed values are
None/gzip/snappy/other broker-supported values - Any custom configs that would be passed to the underlying library... such as the level of compression, multi-core vs single (this may not be relevant for our use case), etc. This seems to me to be the trickiest piece, how to do this somewhat generically/thin so it provides flexibility rather than straight-jacketing the user.
I have created a very basic implementation of zstd support, which basically mimics the other supported compression algorithms and plugs in the new codec type into the required classes / docstrings. Also i made an assumption about the underlying requirement for the zstd library
Since this PR in itself touches 6 files, I was wandering what the generic interface for the compressor class would look like? Backward compatibility dictates that the compression_type now be either a string or an object with the interface mentioned by @jeffwidman , and the rest of the logic would probably flow from that point on. All unit tests should still work, and new ones will have to be added to check existing functionality.
https://github.com/dpkp/kafka-python/pull/2015
Please provide some feedback , thank you
https://github.com/dpkp/kafka-python/pull/2021
Can close, merged in https://github.com/dpkp/kafka-python/pull/2021
getting below error when trying to consume from a ZSTD compressed record from kafka.
raise UnsupportedCodecError( kafka.errors.UnsupportedCodecError: UnsupportedCodecError: Libraries for zstd compression codec not found
error log:
Traceback (most recent call last): File "/home/PycharmProjects/myPythonLearning/kafka/consumer/consumer.py", line 25, in consume_from_topic(topic_to_consume) File "/home/PycharmProjects/myPythonLearning/kafka/consumer/consumer.py", line 14, in consume_from_topic for message in consumer: File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/group.py", line 1193, in next return self.next_v2() File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/group.py", line 1201, in next_v2 return next(self._iterator) File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/group.py", line 1116, in _message_generator_v2 record_map = self.poll(timeout_ms=timeout_ms, update_offsets=False) File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/group.py", line 655, in poll records = self._poll_once(remaining, max_records, update_offsets=update_offsets) File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/group.py", line 708, in _poll_once records, _ = self._fetcher.fetched_records(max_records, update_offsets=update_offsets) File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/fetcher.py", line 344, in fetched_records self._next_partition_records = self._parse_fetched_data(completion) File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/fetcher.py", line 818, in _parse_fetched_data unpacked = list(self._unpack_message_set(tp, records)) File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/fetcher.py", line 467, in _unpack_message_set for record in batch: File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/record/default_records.py", line 276, in iter self._maybe_uncompress() File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/record/default_records.py", line 183, in _maybe_uncompress self._assert_has_codec(compression_type) File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/record/default_records.py", line 118, in _assert_has_codec raise UnsupportedCodecError( kafka.errors.UnsupportedCodecError: UnsupportedCodecError: Libraries for zstd compression codec not found
Process finished with exit code 1
My Code for consuming a kafka topic:
from kafka import KafkaConsumer def consume_from_topic(topic): consumer = KafkaConsumer( topic, bootstrap_servers='localhost:9092', group_id='zstd-11-consumer-group', auto_offset_reset='earliest', enable_auto_commit=True ) try: for message in consumer: v = message.value k = message.key.decode("utf-8") log = "key={}, offset={}, partition={}, value={}".format(k, message.offset, message.partition, v) print(log)
except KeyboardInterrupt: consumer.close() if name == "main": topic_to_consume = "Integrate-Package-Zstd-ESP.info" consume_from_topic(topic_to_consume)