confluent-kafka-python icon indicating copy to clipboard operation
confluent-kafka-python copied to clipboard

Cannot "reject" pulled message without commit

Open almaz1213 opened this issue 3 years ago • 2 comments

Description

How to pass an offset/message to the next member of a consumer group? i.e. if the current consumer process has been failed. consumer.pause() instead of consumer.commit(msg) does not solve it

How to reproduce

....
while self._running:
      msg = self._consumer.poll(timeout=1.0)
      if msg is None: 
        continue
      if msg.error():
        if msg.error().code() == KafkaError._PARTITION_EOF:
          warnprn('%% %s [%d] reached end at offset %d\n' % (msg.topic(), msg.partition(), msg.offset()))
        elif msg.error().code() == KafkaError.UNKNOWN_TOPIC_OR_PART:
          raise IolConnectExeption(msg.error())
        elif msg.error():
          warnprn(msg.error())
      else:
        if (self._horseSharedParams['maxAsyncInMsgHandlers'] > self._horseSharedParams['asyncProcCount']):
          self._loop.run_until_complete(self.__runAsync(msg))
        else:
          //HOW TO REJECT msg to pass the offset to another consumer of the consumer group???
....

Checklist

Please provide the following information:

  • [ X] confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()): version = ('1.7.0', 17235968) libversion = ('1.7.0', 17236223)

almaz1213 avatar Dec 10 '21 05:12 almaz1213

Hi @almaz1213 , thanks for asking.

For your question HOW TO REJECT msg to pass the offset to another consumer of the consumer group

So the rule in Kafka is only one consumer in a consumer group can be assigned to consume messages from a partition in a topic and hence multiple Kafka consumers from a consumer group can not read the same message from a partition.

So I don't think you can pass the offset to another consumer of the consumer group.

jliunyu avatar Feb 23 '22 23:02 jliunyu

Hi jlinyu, thanks for reply. I know that. But I need stop consuming and reject the last polled offset without commiting by current group member if the consumer to busy. In https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#consumer there are some methods like pause(), unassign(), unsuabscribe(). May some of them help to leave consuming group if the current group memeber too busy and join to the group when current tasks completed?

almaz1213 avatar Feb 24 '22 02:02 almaz1213

to leave the group, call close.

it seems like you are wanting to use kafka in an unusual way.

mhowlett avatar Oct 25 '22 15:10 mhowlett