aiokafka icon indicating copy to clipboard operation
aiokafka copied to clipboard

LeaveGroup but failed to rejoin

Open bobzsj87 opened this issue 5 years ago • 14 comments

Describe the bug Randomly, kafka consumer failed to fetch new messages, then kicked out of the consumer group after max_poll timeout, but failed to rejoin

Expected behaviour The consumer should not fail to fetch new messages

Environment (please complete the following information):

  • aiokafka version 0.6.0
  • Kafka 2.2.1 and 2.3.1, same error happened in both versions

Reproducible example Difficult to reproduce, it happens occasionally on our live environment

What we observed

  • Out of sudden, one consumer stops fetching messages for its assigned partition. The lags increase. We observe the consumer is still registered in the Kafka broker properly
  • After the max_poll timeout, the broker forces rebalance and kick the consumer out
  • The consumer still thinks it has the assigned partition, although it receives LeaveGroup and revoked partition. It keeps sending packages to the Broker (we observe a very high RX/TX caused by this consumer)
  • We can only detect the abnormality and manually stop the consumer.

** logs **

  • after max_poll timeout, we saw the following messages INFO] aiokafka.consumer.group_coordinator => LeaveGroup request succeeded 12:11:45 2020-06-06 12:11:45,510 [ERROR] aiokafka.consumer.group_coordinator => OffsetCommit failed for group score_worker due to group error ([Error 25] UnknownMemberIdError: score_worker), will rejoin 12:11:45 2020-06-06 12:11:45,517 [ERROR] aiokafka.consumer.group_coordinator => OffsetCommit failed for group score_worker due to group error ([Error 25] UnknownMemberIdError: score_worker), will rejoin 2020-06-06 12:11:45,517 [ERROR] aiokafka.consumer.group_coordinator => OffsetCommit failed for group score_worker due to group error ([Error 25] UnknownMemberIdError: score_worker), will rejoin 12:11:45 2020-06-06 12:11:45,523 [ERROR] aiokafka.consumer.group_coordinator => OffsetCommit failed for group score_worker due to group error ([Error 25] UnknownMemberIdError: score_worker), will rejoin 2020-06-06 12:11:45,523 [ERROR] aiokafka.consumer.group_coordinator => OffsetCommit failed for group score_worker due to group error ([Error 25] UnknownMemberIdError: score_worker), will rejoin
  • ... repeats the UnknownMemberId for a number of times*

12:11:45 2020-06-06 12:11:45,551 [ERROR] aiokafka.consumer.group_coordinator => OffsetCommit failed before join, ignoring: CommitFailedError: ('Commit cannot be completed since the group has already\n rebalanced and assigned the partitions to another member.\n This means that the time between subsequent calls to poll()\n was longer than the configured max_poll_interval_ms,

12:11:45 2020-06-06 12:11:45,556 [INFO] aiokafka.consumer.group_coordinator => Revoking previously assigned partitions frozenset({TopicPartition(topic='pron-job', partition=39), TopicPartition(topic='pron-job', partition=75), TopicPartition(topic='pron-job', partition=111), TopicPartition(topic='pron-job', partition=3)}) for group score_worker 2020-06-06 12:11:45,556 [INFO] aiokafka.consumer.group_coordinator => Revoking previously assigned partitions frozenset({TopicPartition(topic='pron-job', partition=39), TopicPartition(topic='pron-job', partition=75), TopicPartition(topic='pron-job', partition=111), TopicPartition(topic='pron-job', partition=3)}) for group score_worker

After that, there was no INFO level log at all. But the length of consumer.assignment is still the number before revoked.

bobzsj87 avatar Jun 06 '20 13:06 bobzsj87

May be related to https://github.com/aio-libs/aiokafka/issues/575 fix.

tvoinarovskyi avatar Jun 07 '20 16:06 tvoinarovskyi

@bobzsj87 Do you happen to have any on_partitions_revoked hooks configured? Maybe they last forever for some reason or deadlock with consumer not able to say poll messages?

tvoinarovskyi avatar Jun 07 '20 16:06 tvoinarovskyi

@tvoinarovskyi hi, nope, there was no listener configured.

bobzsj87 avatar Jun 07 '20 19:06 bobzsj87

@tvoinarovskyi Could this issue be related to #628 ? As the first abnormality is the consumer just stops consuming. Or anything to do with the fetcher idling? Any insights will be helpful. Many thanks

bobzsj87 avatar Jun 10 '20 19:06 bobzsj87

@bobzsj87 Maybe it may have something to do with it, could you try a build with the latest master? It should be pretty much same as 0.6.0 with the fix for #628.

tvoinarovskyi avatar Jun 11 '20 01:06 tvoinarovskyi

@bobzsj87 Looking at several places where the behaviur may be triggered. Found 1 strange place, that may result in a bottleneck. The Coordinator is configured to not join the group if user never asks for data using one of get or getmany: https://github.com/aio-libs/aiokafka/blob/master/aiokafka/consumer/group_coordinator.py#L687

But if we look at the code for say consumer.position it waits for an actual position to be retrieved: https://github.com/aio-libs/aiokafka/blob/master/aiokafka/consumer/consumer.py#L603 So if a user requests a message, processes it long enough to trigger a max_poll_interval and asks for position before polling for next message using await consumer.position() it may as well deadlock the system. It's an actual bug, but I am yet to confirm a reproducible example.

Are you by chance using position after fetching the data?

tvoinarovskyi avatar Jun 12 '20 14:06 tvoinarovskyi

No, seems like it was a false lead, sorry about that. I was not able to reproduce the scenario, as the assignment is only changed when a new join succeeds, not when leaving group triggers. Thus position will still return valid values until rebalance is finished. Btw, you did ask about a similar thing:

After that, there was no INFO level log at all. But the length of consumer.assignment is still the number before revoked.

The assignment is not changed until a JoinGroup is finished successfully. Mostly this is done to avoid having an empty assignment at any point in time during consumption (as some functions still rely on it, like position).

tvoinarovskyi avatar Jun 14 '20 14:06 tvoinarovskyi

I am quite positive, that the behaviour you see is related to the waiter, that prevents group rejoining if no getone or getmany calls are made (including iteration, which calls getone): https://github.com/aio-libs/aiokafka/blob/master/aiokafka/consumer/group_coordinator.py#L687. It means, that some application code between get* calls is waiting forever, probably because the rebalance never succeeds. Could you share some information on what operations are performed in the processing loop?

tvoinarovskyi avatar Jun 14 '20 16:06 tvoinarovskyi

Hi @tvoinarovskyi , thanks so much for looking at it. Yes, I reached a similar conclusion that it might be our code within the async for loop of the consumer that blocks the loop. We have a heavy-duty non-async code wrapped in asyncio.get_running_loop().run_in_executor. This function further spawns multiprocess and uses some C-binding libraries.

I've added more logs and timeout in the multiprocessing within the code and will get more info when such problem happens again.

Do you suggest using a new event loop to be passed to the consumer/producer? Currently, we use the default get_event_loop, which is shared with the function-in-question's loop.

bobzsj87 avatar Jun 14 '20 18:06 bobzsj87

Use the default loop, seems like the current idea in Python is to use an implicit loop, they try to hide it where possible. Even in aiokafka we don't require the loop argument any more and will make the it less visible from now on.

tvoinarovskyi avatar Jun 14 '20 21:06 tvoinarovskyi

And no worries, I have discovered a lot of places that require enhancements while working with the issue. Keep me notified on any updates, will try to help out with what I can.

tvoinarovskyi avatar Jun 14 '20 21:06 tvoinarovskyi

@tvoinarovskyi do you have any new insights here? Sorry kinda old issue but seems relevant to date.

dada-engineer avatar Aug 14 '23 07:08 dada-engineer

@tvoinarovskyi @dabdada It is still relevant with the latest version of aiokafka==0.8.1 and very hard to debug as there are no exceptions thrown.

sivaNbalusu5 avatar Aug 30 '23 19:08 sivaNbalusu5

I am quite positive, that the behaviour you see is related to the waiter, that prevents group rejoining if no getone or getmany calls are made (including iteration, which calls getone): https://github.com/aio-libs/aiokafka/blob/master/aiokafka/consumer/group_coordinator.py#L687. It means, that some application code between get* calls is waiting forever, probably because the rebalance never succeeds. Could you share some information on what operations are performed in the processing loop?

We are also seeing this, and there are no other calls to aiokafka inbetween calls to getone() ...

gtedesco-r7 avatar Feb 05 '24 08:02 gtedesco-r7