confluent-kafka-python
confluent-kafka-python copied to clipboard
Cannot "reject" pulled message without commit
Description
How to pass an offset/message to the next member of a consumer group? i.e. if the current consumer process has been failed. consumer.pause() instead of consumer.commit(msg) does not solve it
How to reproduce
....
while self._running:
msg = self._consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
warnprn('%% %s [%d] reached end at offset %d\n' % (msg.topic(), msg.partition(), msg.offset()))
elif msg.error().code() == KafkaError.UNKNOWN_TOPIC_OR_PART:
raise IolConnectExeption(msg.error())
elif msg.error():
warnprn(msg.error())
else:
if (self._horseSharedParams['maxAsyncInMsgHandlers'] > self._horseSharedParams['asyncProcCount']):
self._loop.run_until_complete(self.__runAsync(msg))
else:
//HOW TO REJECT msg to pass the offset to another consumer of the consumer group???
....
Checklist
Please provide the following information:
- [ X] confluent-kafka-python and librdkafka version (
confluent_kafka.version()
andconfluent_kafka.libversion()
): version = ('1.7.0', 17235968) libversion = ('1.7.0', 17236223)
Hi @almaz1213 , thanks for asking.
For your question HOW TO REJECT msg to pass the offset to another consumer of the consumer group
So the rule in Kafka is only one consumer in a consumer group can be assigned to consume messages from a partition in a topic and hence multiple Kafka consumers from a consumer group can not read the same message from a partition.
So I don't think you can pass the offset to another consumer of the consumer group.
Hi jlinyu, thanks for reply. I know that. But I need stop consuming and reject the last polled offset without commiting by current group member if the consumer to busy. In https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#consumer there are some methods like pause(), unassign(), unsuabscribe(). May some of them help to leave consuming group if the current group memeber too busy and join to the group when current tasks completed?
to leave the group, call close
.
it seems like you are wanting to use kafka in an unusual way.