kafka-python icon indicating copy to clipboard operation
kafka-python copied to clipboard

Support for transactional messages

Open imduffy15 opened this issue 5 years ago • 10 comments

The client doesn't support transactional messages. On trying to consume messages created by a transactional producer (scala/java based) the python client fails to correctly give values for key/value.

imduffy15 avatar Jun 29 '19 12:06 imduffy15

We've been having issues with one of our consumers that uses this library, and realized that it's because of transactional messages. In particular, we're seeing that the control messages are being exposed to the client, which should not happen. See the documentation here, which says:

Clients should not return control batches (ie. those with this bit set) to applications. (since 0.11.0.0)

jutley avatar Aug 29 '19 18:08 jutley

A PR would be most welcome!

jeffwidman avatar Oct 11 '19 18:10 jeffwidman

https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol

The sixth lowest bit indicates whether the RecordBatch includes a control message. 1 indicates that the RecordBatch is contains a control message, 0 indicates that it doesn't. Control messages are used to enable transactions in Kafka and are generated by the broker. Clients should not return control batches (ie. those with this bit set) to applications. (since 0.11.0.0)

in https://github.com/dpkp/kafka-python/blob/master/kafka/consumer/fetcher.py#L458, the batch should be skipped if it's a control batch:

            while batch is not None:
                if getattr(batch, 'is_control_batch', False):
                    continue

wdyt ?

jouve avatar Aug 25 '20 13:08 jouve

We would be interested in helping add support at least for adding the ability to filter out control messages. It looks like this issue is a bit old, so I thought I'd check to see where this is at before jumping in.

dotnwat avatar Mar 17 '21 21:03 dotnwat

I don't think anyone has put together a PR, but if you did we'd be happy to review it.

jeffwidman avatar Mar 18 '21 06:03 jeffwidman

Seems pretty old but any update ?

nilansaha avatar May 22 '21 03:05 nilansaha

https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol

The sixth lowest bit indicates whether the RecordBatch includes a control message. 1 indicates that the RecordBatch is contains a control message, 0 indicates that it doesn't. Control messages are used to enable transactions in Kafka and are generated by the broker. Clients should not return control batches (ie. those with this bit set) to applications. (since 0.11.0.0)

in https://github.com/dpkp/kafka-python/blob/master/kafka/consumer/fetcher.py#L458, the batch should be skipped if it's a control batch:

            while batch is not None:
                if getattr(batch, 'is_control_batch', False):
                    continue

wdyt ?

Does this work ?

nilansaha avatar May 22 '21 03:05 nilansaha

@dotnwat @jutley I have the same problem but I can't replicate this locally using kakfa zookeeper docker however. I can try making a PR if I can replicate it locally.

nilansaha avatar May 22 '21 05:05 nilansaha

in https://github.com/dpkp/kafka-python/blob/master/kafka/consumer/fetcher.py#L458, the batch should be skipped if it's a control batch:

            while batch is not None:
                if getattr(batch, 'is_control_batch', False):
                    continue

wdyt ?

This probably won't work, but if you add

batch = records.next_batch()

before continue, then it might work. Also probably makes sense to put the condition here: https://github.com/dpkp/kafka-python/blob/master/kafka/consumer/fetcher.py#L467 so the offsets are correct?

smalyshev avatar Mar 25 '22 07:03 smalyshev

We recently struck the above issue. I've tested the proposed fix from @jouve / @nilansaha /@smalyshev and it works as intended. I've created a pull request under #2361

bradenneal1 avatar Apr 05 '23 23:04 bradenneal1