faust icon indicating copy to clipboard operation
faust copied to clipboard

Add support for RoundRobinAssignor

Open alexakra opened this issue 2 years ago • 9 comments

Checklist

  • [X] I have included information about relevant versions
  • [X] I have verified that the issue persists when using the master branch of Faust.

Expected behavior

I would like to use RoundRobinAssignor to distribute available partitions evenly across all consumers.

Actual behavior

Today Faust groups the same partition numbers onto the same consumers for all topics with the same number of partitions.

Versions

  • Python version: 3.10
  • Faust version: 0.10.1
  • Operating system: Debian GNU/Linux 11 (bullseye)
  • Kafka version: 3.4

alexakra avatar May 27 '23 12:05 alexakra

Faust can support using aiokafka's RoundRobinAssignor ! We have some cases where it uses that by default for cases such as https://github.com/faust-streaming/faust/pull/402.

Try setting app.assignor=RoundRobinPartitionAssignor on startup.

wbarnha avatar May 27 '23 20:05 wbarnha

It is failing: [ERROR] [^-App]: Crashed reason=AttributeError("type object 'RoundRobinPartitionAssignor' has no attribute 'assigned_standbys'") I think that there is no assignor field on App.

alexakra avatar May 29 '23 12:05 alexakra

same issue here

tynianovddi avatar Jun 03 '23 17:06 tynianovddi

Have you tried setting PartitionAssignor when initializing the app? https://faust-streaming.github.io/faust/userguide/settings.html#partitionassignor

dada-engineer avatar Oct 27 '23 18:10 dada-engineer

Have you tried setting PartitionAssignor when initializing the app? https://faust-streaming.github.io/faust/userguide/settings.html#partitionassignor

Still failing: app = App(..., PartitionAssignor=RoundRobinPartitionAssignor)

File "../lib/python3.11/site-packages/mode/services.py", line 807, in _default_start await self._actually_start() File "../lib/python3.11/site-packages/mode/services.py", line 824, in _actually_start await self.on_start() File "../lib/python3.11/site-packages/faust/transport/drivers/aiokafka.py", line 478, in on_start self._consumer = self._create_consumer(loop=self.thread_loop) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "../lib/python3.11/site-packages/faust/transport/drivers/aiokafka.py", line 497, in _create_consumer return self._create_worker_consumer(transport) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "../lib/python3.11/site-packages/faust/transport/drivers/aiokafka.py", line 507, in _create_worker_consumer self.app.assignor File "../lib/python3.11/site-packages/mode/utils/objects.py", line 659, in get value = obj.dict[self.name] = self.__get(obj) ^^^^^^^^^^^^^^^ File "../lib/python3.11/site-packages/faust/app/base.py", line 2069, in assignor assignor = self.conf.PartitionAssignor( # type: ignore ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ TypeError: RoundRobinPartitionAssignor() takes no arguments

alexakra avatar Nov 06 '23 07:11 alexakra

This is weird as faust is also setting this assignor in a case where table standby replicas is set to zero. If you do not need them can you try to set the setting TABLE_STANDBY_REPLICAS to zero?

If not you would need to build your own class that implements the interface of fausts PartitionAssignorT type

dada-engineer avatar Nov 06 '23 07:11 dada-engineer

I did and it works. But I opened this issue to solve it properly and not looking for a workaround because I cannot rely on it.

alexakra avatar Nov 06 '23 07:11 alexakra

I do not see an issue. faust says if you want to assign a PartitionAssignor your own you need to provide a PartitonAssignorT compatible class. RoundRobinPartitioner clearly isn't as it has no init function accepting the provided args of faust.

Whats the issue in summary then? 🤔

dada-engineer avatar Nov 06 '23 09:11 dada-engineer

My thoughts:

  1. Faust internally uses RoundRobinPartitioner which is not PartitonAssignorT compatible class. Why is it so?
  2. This is a common use case where many would like to use the standard implementation. I don't need a custom one, so there is no need that each of us will implement custom RoundRobinPartitioner.

alexakra avatar Nov 06 '23 10:11 alexakra