confluent-kafka-python icon indicating copy to clipboard operation
confluent-kafka-python copied to clipboard

on_revoke callback passed partition list with partition not already in assignment after consumer group session timeout

Open keithks opened this issue 2 years ago • 0 comments

Description

We are using a single confluent_kafka.Consumer for a partition group that is always unique (via generating a fresh uuid4 in the group) We use on_assign and on_revoke callbacks in the consumer with the consumer.subscribe call

Our on_assign callback looks like this:

    def _set_and_assign_offsets(self, consumer: ConfluentConsumer, partitions: List[TopicPartition]):
        self._set_offsets_by_script_state(consumer, partitions)

        self._event_loop.call_soon_threadsafe(self._assignment_sync_event.set)

    def _set_offsets_by_script_state(self, consumer: ConfluentConsumer, partitions: List[TopicPartition]):
        for p in partitions:
            if p.topic in self._begin_topics or p.topic in self._snapshot_topics:
                p.offset = OFFSET_BEGINNING if self._initialising else OFFSET_STORED
            elif p.topic in self._end_topics:
                p.offset = OFFSET_END if self._initialising else OFFSET_STORED

        consumer.incremental_assign(partitions)

Our on_revoke callback looks like this:

def _on_revoke(self, consumer: ConfluentConsumer, partitions: List[TopicPartition]):
    _logger.info(f'These partitions have been revoked from the consumer {partitions}')
    consumer.incremental_unassign(partitions)

At some point we also use consumer.incremental_unassign to pause consumption for self._snapshot_topics We use batch consume to consume messages from this consumer (num_messages > 1)

I think when our broker is doing some daily cleanup, we get kicked from the cluster leader, but it seems that the on_revoke callback is passed topic partitions that the consumer no longer has an assignment to

There is only 1 consumer in that cg, so we would expect all of TOPIC_2 to be assigned to this consumer

Also, this seems to happen without the on_revoke callback also. Initially we were happy with the defaults

"2023-04-10T13:05:12.080Z","<HOSTNAME>","%4|1681131912.080|SESSTMOUT|<CONSUMER_NAME>| [thrd:main]: Consumer group session timed out (in join-state steady) after 10498 ms without a successful response from the group coordinator (broker 3, last error was Success): revoking assignment and rejoining group"
"2023-04-10T13:05:12.081Z","<HOSTNAME>","%4|1681131912.081|ASSIGN|<CONSUMER_NAME>| [thrd:main]: Group ""<GROUP_NAME>"": application *assign() call failed: <TOPIC_2> [0] can't be unassigned since it is not in the current assignment"
"2023-04-10T13:05:12.081Z","<HOSTNAME>","event='These partitions have been revoked from the consumer [TopicPartition{topic=<TOPIC_1>,partition=0,offset=-1001,error=None}, TopicPartition{topic=<TOPIC_1>,partition=1,offset=-1001,error=None}, TopicPartition{topic=<TOPIC_1>,partition=2,offset=-1001,error=None}, TopicPartition{topic=<TOPIC_1>,partition=3,offset=-1001,error=None}, TopicPartition{topic=<TOPIC_1>,partition=4,offset=-1001,error=None}, TopicPartition{topic=<TOPIC_2>,partition=0,offset=-1001,error=None}, TopicPartition{topic=<TOPIC_2>,partition=1,offset=-1001,error=None}, TopicPartition{topic=<TOPIC_2>,partition=2,offset=-1001,error=None}, TopicPartition{topic=<TOPIC_2>,partition=3,offset=-1001,error=None}, TopicPartition{topic=<TOPIC_2>,partition=4,offset=-1001,error=None}, TopicPartition{topic=<TOPIC_3>,partition=0,offset=-1001,error=None}, TopicPartition{topic=<TOPIC_3>,partition=1,offset=-1001,error=None}, TopicPartition{topic=<TOPIC_3>,partition=2,offset=-1001,error=None}, TopicPartition{topic=<TOPIC_3>,partition=3,offset=-1001,error=None}, TopicPartition{topic=<TOPIC_3>,partition=4,offset=-1001,error=None}]' logger='<package>.kafka.Consumer' level='info' timestamp='2023-04-10T13:05:12.080859Z'"

How to reproduce

Using confluent_kafka 2.0.2 and librdkafka 2.0.2

Broker version is 2.6-IV0

Use the above setup and then maybe shut down the leader to guarantee a repro. We get this nightly so it is pretty consistent

Checklist

Please provide the following information:

  • [x] confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()):
  • [x] Apache Kafka broker version: 2.6-IV0
  • [x] Client configuration: {'auto.offset.reset': 'smallest', 'partition.assignment.strategy': 'cooperative-sticky', 'check.crcs': True, 'connections.max.idle.ms': 540000, 'enable.auto.commit': True, 'enable.partition.eof': True, 'fetch.wait.max.ms': 20, 'max.in.flight': 5, 'metadata.max.age.ms': 300000, 'reconnect.backoff.max.ms': 1000, 'reconnect.backoff.ms': 50, 'session.timeout.ms': 10000, 'socket.receive.buffer.bytes': 16777216, 'socket.send.buffer.bytes': 16777216, 'socket.timeout.ms': 30000}
  • [x] Operating system: CentOS 8 on Docker
  • [x] Provide client logs (with 'debug': '..' as necessary)
  • [ ] Provide broker log excerpts
  • [x] Critical issue: No - working around by restarting the application

keithks avatar Apr 12 '23 06:04 keithks