kafka-python
kafka-python copied to clipboard
Support for transactional messages
The client doesn't support transactional messages. On trying to consume messages created by a transactional producer (scala/java based) the python client fails to correctly give values for key/value.
We've been having issues with one of our consumers that uses this library, and realized that it's because of transactional messages. In particular, we're seeing that the control messages are being exposed to the client, which should not happen. See the documentation here, which says:
Clients should not return control batches (ie. those with this bit set) to applications. (since 0.11.0.0)
A PR would be most welcome!
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
The sixth lowest bit indicates whether the RecordBatch includes a control message. 1 indicates that the RecordBatch is contains a control message, 0 indicates that it doesn't. Control messages are used to enable transactions in Kafka and are generated by the broker. Clients should not return control batches (ie. those with this bit set) to applications. (since 0.11.0.0)
in https://github.com/dpkp/kafka-python/blob/master/kafka/consumer/fetcher.py#L458, the batch should be skipped if it's a control batch:
while batch is not None:
if getattr(batch, 'is_control_batch', False):
continue
wdyt ?
We would be interested in helping add support at least for adding the ability to filter out control messages. It looks like this issue is a bit old, so I thought I'd check to see where this is at before jumping in.
I don't think anyone has put together a PR, but if you did we'd be happy to review it.
Seems pretty old but any update ?
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
The sixth lowest bit indicates whether the RecordBatch includes a control message. 1 indicates that the RecordBatch is contains a control message, 0 indicates that it doesn't. Control messages are used to enable transactions in Kafka and are generated by the broker. Clients should not return control batches (ie. those with this bit set) to applications. (since 0.11.0.0)
in https://github.com/dpkp/kafka-python/blob/master/kafka/consumer/fetcher.py#L458, the batch should be skipped if it's a control batch:
while batch is not None: if getattr(batch, 'is_control_batch', False): continue
wdyt ?
Does this work ?
@dotnwat @jutley I have the same problem but I can't replicate this locally using kakfa zookeeper docker however. I can try making a PR if I can replicate it locally.
in https://github.com/dpkp/kafka-python/blob/master/kafka/consumer/fetcher.py#L458, the batch should be skipped if it's a control batch:
while batch is not None: if getattr(batch, 'is_control_batch', False): continue
wdyt ?
This probably won't work, but if you add
batch = records.next_batch()
before continue, then it might work. Also probably makes sense to put the condition here: https://github.com/dpkp/kafka-python/blob/master/kafka/consumer/fetcher.py#L467 so the offsets are correct?
We recently struck the above issue. I've tested the proposed fix from @jouve / @nilansaha /@smalyshev and it works as intended. I've created a pull request under #2361