pykafka
pykafka copied to clipboard
Apache Kafka client for Python; high-level & low-level consumer/producer, with great performance.
This pull request addresses https://github.com/Parsely/pykafka/issues/831 by adding support for `RecordBatch` message formatting to pykafka. This includes supporting the encoding and decoding of [`varints`](https://developers.google.com/protocol-buffers/docs/encoding#varints) via the `struct_helpers` interface.
This pull request fixes a version comparison operation that caused ssl tests not to run under kafka 0.10.
Rather than iterate over a delivery report, an optional callback per message is used that gets called when the message is marked as delivered. This makes it easier to attach...
``` Traceback (most recent call last): File "pyconsume.py", line 30, in use_rdkafka = True File "/usr/local/lib/python2.7/dist-packages/pykafka/topic.py", line 214, in get_balanced_consumer return cls(self, self._cluster, consumer_group, **kwargs) File "/usr/local/lib/python2.7/dist-packages/pykafka/managedbalancedconsumer.py", line 188, in...
Just encountered a scenario where I was trying to delete a topic like this: ``` cl = pykafka.KafkaClient(...) test_topic = cl.topics['test'.encode()] cl.cluster.controller_broker.delete_topics(['test'.encode()], timeout=1000) ``` and got a `SocketDisconnectedError`. Eventually tracked...
Since Kafka 0.11, the `MessageSet` format has had an [updated version](https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets) called `RecordBatch`. We should investigate what it would take to support this format, and what the benefits would be....
`OffsetRequest` has had a few [changes](https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol) in its v1 release, which pykafka should support.
I'm trying to create several consumers with the same consumer_id to allow several clients to read from same Kafka topic. But for some reasons, each new connections establishment takes more...
Already queued messages are lost when a SocketDisconnectError is detected [here](https://github.com/Parsely/pykafka/blob/56efe3975ee707f4ec48aa8da81fde9fbbf14dda/pykafka/producer.py#L518). This is occurring because the `Producer._update` method first closes all OwnedBrokers [here](https://github.com/Parsely/pykafka/blob/56efe3975ee707f4ec48aa8da81fde9fbbf14dda/pykafka/producer.py#L274), which results in the OwnedBroker setting `self.running`...