sarama icon indicating copy to clipboard operation
sarama copied to clipboard

[Question] Parallelism in consumer group

Open shubham-dogra-s1 opened this issue 7 months ago • 11 comments

Hi @dnwe @puellanivis I can see in sarama source code that for each parition we are creating a new go-routine. Want to understand if there is any difference in performance?

No of partitions: 32

  • case 1: 1 consumer
  • case 2: 32 consumers

for first case: even though I have one consumer but sarama behind the scenes creating 32 threads?

shubham-dogra-s1 avatar Apr 25 '25 12:04 shubham-dogra-s1

I'm interested in understanding the differences on this as well.

dharmjit avatar Apr 28 '25 08:04 dharmjit

Each topic and partition pair runs its own Consumer in a separate goroutine: https://github.com/IBM/sarama/blob/d6ca80a198c4fbc467c67f7430182eda6881b12c/consumer_group.go#L868-L892

There is no reason to make multiple consumers unless you need an actually separate set of connection and configuration parameters. Also, starting up multiple consumers can mess with partitioning logic. That is, with two computers running consumers, you want the partitioning to assign one half of the partitions to one, and the rest to the other. But if one of them is running two consumers, but the other is running one, then you will get a third of partitions being allocating to each consumer, and thus one computer is imbalanced, and taking more partitions than it generally should.

This also causes issues once you get beyond the number of consumers. Specifically, in case 1, if you spin up a new consumer due to increased load, then a partition rebalance will occur, and it will split the traffic now between the two consumers. In case 2, if you spin up a new consumer due to increased load, then a rebalance will also occur, but now you have 32 partitions to spread among 33 consumers, so you’re left with a ~50:50 shot that the 33 choose 32 permutation is all on the same overloaded machine with 32 consumers, and 31 on the overloaded machine, and 1 on the relief service.

Even if you start up a second service with 32 consumers, now you just have “best odds” that half the partitions will be on one machine, and the rest on the other, and but it’s still entirely possible that all 32 partitions could end up on the old machine still.

TL;DR: Do not create extra consumers. Partitions are already running on separate goroutines, so there’s no improvement in parallelism or concurrency. The only thing this will do is confuse the partitioning balance algorithm, and result in at-best stochastic balancing rather than proper balancing.

puellanivis avatar Apr 28 '25 10:04 puellanivis

Thanks @puellanivis for the detailed response.

Apart from having to deal with non-fair balancing algorithms with multiple consumers in a consumergroup setup, I guess sometimes we can't avoid adding consumers to a consumergroup in case of a multi-instance[auto-scale] user application, or we have to choose to design the app differently. But yes, for a single-instance app, it makes sense to have a single consumer, and the underlying partition consumers take care of the parallelism.

dharmjit avatar Apr 30 '25 05:04 dharmjit

Yeah, kind obviously, if you have multiple instances, they each need to join a consumer to the consumer group. But everything will take care of parallelizing the partitions in this case as well.

puellanivis avatar May 04 '25 10:05 puellanivis

@puellanivis @dharmjit Thanks for the response but fair partitioning is not my concern.

Use Case: I want to auto-scale based on lag in kafka topic. If lag is greater than 100k spin up new consumer for the group. But again my question is will it increase the throughput?

Note: We can assume that the max number of consumers will not exceed no of partitions

  • As before auto-scaling we have one thread for each partition.
  • After autoscaling we still have separate thread consuming from each partition. The only difference is now we have 2 instances?

shubham-dogra-s1 avatar May 06 '25 17:05 shubham-dogra-s1

Use Case: I want to auto-scale based on lag in kafka topic. If lag is greater than 100k spin up new consumer for the group. But again my question is will it increase the throughput?

@shubham-dogra-s1 I would suggest benchmarking both setups, one with a single consumer in a consumer group and another with two or more consumers in a consumer group.

dharmjit avatar May 06 '25 17:05 dharmjit

@dharmjit have you used autoscaling with sarama in production? what is your experience? Spinning a new pod for each consumer in theory should increase throughput as now each consumer is getting more cpu time

shubham-dogra-s1 avatar May 06 '25 17:05 shubham-dogra-s1

Spinning a new pod for each consumer in theory should increase throughput as now each consumer is getting more cpu time

Yes, if the CPU is being throttled, pod CPU metrics should be able to highlight that.

have you used autoscaling with sarama in production

We are using it a bit differently as we are building a platform on top of Kafka, and we use the sarama library for emission and consumption. But as we provide other capabilities with the platform APIs, we do see an increase in the consumption throughput with more number of consumers, even with a single pod instance as well as multiple pods.

dharmjit avatar May 06 '25 18:05 dharmjit

@puellanivis @dharmjit Thanks for the response but fair partitioning is not my concern.

Fair partitioning was your concern.

Use Case: I want to auto-scale based on lag in kafka topic. If lag is greater than 100k spin up new consumer for the group. But again my question is will it increase the throughput?

Note: We can assume that the max number of consumers will not exceed no of partitions

  • As before auto-scaling we have one thread for each partition.
  • After autoscaling we still have separate thread consuming from each partition. The only difference is now we have 2 instances?

If you have fair partitioning among the two instances, then throughput can potentially increase, as you now have two instances hopefully on different computers processing messages rather than all on the same one computer.

Namely, if there are no more partitions to hand out, then no amount of autoscaling will ever reduce the load of the system. Spinning up to n+1 instances when you have n partitions cannot ever run any faster than having n instances, as one of the consumers would never get a partition, and so would never be consuming any messages.

If one computer (a resource pool) has n-1 of all of the partitions, and another has only 1 partition, then the amount of pressure relief on the first computer from the second is minimized. Ideally, when you spin up a second instance, you want it to receive approximately half of all of the partitions, so that the work can be split between the two instances.

PS: There’s zero ability for auto-scaling to do anything at all useful if there is not fair partitioning. The only case where auto-scaling can be useful is where there is fair partitioning.

puellanivis avatar May 08 '25 09:05 puellanivis

I did some benchmarking by running on one consumer per pod. Since I am running one consumer per pod, partitioning is fair Autoscaling: Based of kafka lag (spawn a new pod) Partitions: 16

There is almost zero to no difference in consumer throughput. If I run 1 pod or 16 pods (means 16 consumers in group). In theory no of consumers = no of partitions i.e the max throughput for kafka. Given that we already creating one thread per partition we have already achieved max throughput with one consumer with sarama

I guess if cpu is being throttled then only spwaning a new pod can increase throughput otherwise there is no difference :(

@dharmjit in what case have you seen increase in throughput due to scaling? What metric you use for autoscaling?

shubham-dogra-s1 avatar May 22 '25 14:05 shubham-dogra-s1

I guess if cpu is being throttled then only spwaning a new pod can increase throughput otherwise there is no difference :(

This makes sense, and is going to be the correct assessment. If the threads aren’t CPU throttled, then the one pod should already be handling the maximum load that kafka can shovel out the messages. So, one would want to trigger autoscaling still on pod load (CPU and memory), using the lag alone is unlikely to provide beneficial autoscaling.

If someone is seeing such a benefit, it’s possible that they’re running into CPU issues.

puellanivis avatar May 23 '25 08:05 puellanivis

Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur. Please check if the main branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.

github-actions[bot] avatar Aug 21 '25 10:08 github-actions[bot]