pykafka
pykafka copied to clipboard
Apache Kafka client for Python; high-level & low-level consumer/producer, with great performance.
How can i get the last message in topic ,whe topic have 180000 messages per seconds
Based on the documentation (https://pykafka.readthedocs.io/en/latest/api/simpleconsumer.html), we suppose to be able to use `datetime.datetime` to seek offset with the function `consumer.reset_offsets`. But it doesn't behave as expected. See the following example:...
Is there any way to access the Kafka record headers? I'm talking about [this](https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers).
I created two consumers through the code. When I tried to use the publisher to publish the data, I saw that the data obtained by the two consumers is the...
Installing PyKafka via `pip install pykafka` results in the compilation of a `tests` module inside of `site-packages`. I confirmed this on both Windows and Linux with Python 3.6.5. > ls...
Hey, I'm trying to consume messages from a topic using pykafka with no duplicates. I'm interested in consuming messages every few minutes, so every time I will get only new...
SimpleConsumer.consume raises 'NoneType' object has no attribute 'value' while deserializer is used.
In case of nullable `ret` `consume` method should not run `self._deserializer` method on `None` value. https://github.com/Parsely/pykafka/blob/2.8.0/pykafka/simpleconsumer.py#L510 **PyKafka version**: 2.8.0
**PyKafka version**: 2.8.0 (using SimpleConsumer with rdkafka support) **Kafka version**: 1.0.1 (not reproducible on 0.8.2.2) There's a rare case on our production environment when one of the broker went down,...
What is the proper handling of a ProduceFailureError? It is a very general, non-specific error. My producer is set up : ``` self.kafka.topics['mytopic'].get_producer( delivery_reports=False, linger_ms=0, queue_empty_timeout_ms=1, sync=True ) ``` In...
1. When I'm trying to create topic with the same name that already exists on kafka server, kafka log shows error, **org.apache.kafka.common.errors.TopicExistsException: Topic 'topic-1' already exists.** However, pykafka ends up...