librdkafka icon indicating copy to clipboard operation
librdkafka copied to clipboard

coop sticky algo on large partition number

Open ericwuseattle opened this issue 1 year ago • 14 comments

Description

There are 2 issues, I noticed on kafka coop sticky mode.

  1. The hard code on partition_cnt inside rd_kafka_sticky_assignor_assign_cb https://github.com/confluentinc/librdkafka/blob/master/src/rdkafka_sticky_assignor.c#L1834

  2. On 3K partitions, it's working without issue, but if I increase the partition to 6K with fresh topic(I mean recreate the topic as new one). Have to increase the session.timeout.ms=10000 and max.poll.interval.ms=10000 from 3s to 10s to make it working. Otherwise will get kicked out from grp Broker logs: Member XXX-6F958DDF5F-CDXRQ~-0793c679-d5ef-4753-9056-7da314e1415b in group XXX-TOPIC-NAME-XXX has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator).

I'm not sure what's causing the timeout, but I'm sure we keep calling kafka poll in a timer infinitely. So increase into 10s it's working without any issue.

Trying further on 15K partitions with 10s, no lucky, would not work, gets kicked out from grp.

Overall, 3K partitions 3s timeout, works. 6K partitions 3s timeout, not work. 6K partitions 10s timeout, works. 15K partitions 10s timeout, not work.

How to reproduce

Large partition numbers. 6K partitions with 3s session timeout or 15K partitions with 10s session timeout

Checklist

IMPORTANT: We will close issues where the checklist has not been completed.

Please provide the following information:

  • [x] librdkafka version (release number or git tag): <2.3.0>
  • [x] Apache Kafka version: <3.0>
  • [x] librdkafka client configuration: <fetch.min.bytes=1, fetch.wait.max.ms=500, fetch.error.backoff.ms=0, heartbeat.interval.ms=1000, enable.auto.commit=false, enable.partition.eof=false, enable.auto.offset.store=false, max.poll.interval.ms=3000, session.timeout.ms=3000, partition.assignment.strategy=cooperative-sticky>
  • [x] Operating system: Ubuntu(x64)>
  • [ ] Provide logs (with debug=.. as necessary) from librdkafka
  • [ ] Provide broker log excerpts
  • [x] Critical issue

ericwuseattle avatar Feb 29 '24 18:02 ericwuseattle

Any thoughts on this problem? Are more details needed?

ericwuseattle avatar Apr 12 '24 18:04 ericwuseattle

@ericwuseattle could you send some logs with debug=all . It's possible that it's needed to increase those values for a rebalance with those many partitions, but from the logs we can see where most of the time goes.

emasab avatar May 09 '24 09:05 emasab

unfortunately I do not have the evn was set up to testing in my hand right now, have you checked the hard code of partition count in the code, if we could fix that part first then I'll find time to retry it.

/* FIXME: Let the cgrp pass the actual eligible partition count / size_t partition_cnt = member_cnt * 10; / FIXME */

https://github.com/confluentinc/librdkafka/blob/master/src/rdkafka_sticky_assignor.c#L1834

ericwuseattle avatar May 16 '24 21:05 ericwuseattle

Given your configuration you're not using the sticky assignor as fetch.min.bytes=1, fetch.wait.max.ms=500, fetch.error.backoff.ms=0, heartbeat.interval.ms=1000, enable.auto.commit=false, enable.partition.eof=false, enable.auto.offset.store=false, max.poll.interval.ms=3000, session.timeout.ms=3000

as the default partition.assignment.strategy doesn't include cooperative-sticky could you update your configuration if you're setting it?

emasab avatar May 17 '24 08:05 emasab

@ericwuseattle

/* FIXME: Let the cgrp pass the actual eligible partition count /
size_t partition_cnt = member_cnt * 10; / FIXME */

that is just the estimated partition count used for the the initial size of maps and lists. You can try to increase the multiplier and see if it changes something and send some logs of the leader and 2-3 random members.

emasab avatar May 17 '24 08:05 emasab

Given your configuration you're not using the sticky assignor as fetch.min.bytes=1, fetch.wait.max.ms=500, fetch.error.backoff.ms=0, heartbeat.interval.ms=1000, enable.auto.commit=false, enable.partition.eof=false, enable.auto.offset.store=false, max.poll.interval.ms=3000, session.timeout.ms=3000

as the default partition.assignment.strategy doesn't include cooperative-sticky could you update your configuration if you're setting it?

Sorry, did not give you all the config, but we did have the setting for partition.assignment.strategy=cooperative-sticky I'll update the conf in checklist.

ericwuseattle avatar May 17 '24 16:05 ericwuseattle

@ericwuseattle thanks, other helpful info is:

  • if you have a rebalance callback set. In that case please test without it
  • how many members are there in the group, and if they're all subscribed to that topic with 3-6-15K partitions or to other topics too

emasab avatar May 21 '24 18:05 emasab

I'm not sure what's causing the timeout, but I'm sure we keep calling kafka poll in a timer infinitely. So increase into 10s it's working without any issue.

You have to set the callback to call the incremental partitions assing/revoke client api, beside that we have some our internal logic but only by posting task way, so would not block or cost much of cpu in the kafka callback worker thread.

Trying further on 15K partitions with 10s, no lucky, would not work, gets kicked out from grp.

30 members totally, only 1 topic.

ericwuseattle avatar May 22 '24 20:05 ericwuseattle