kafka-python
kafka-python copied to clipboard
Unable to use StickyPartitionAssignor
Hello!
The problem
I want to minimize downtime during rebalance and try to use "sticky" strategy but every time meet different problems. I use KafkaConsumer with default settings and only set group_id
and change partition_assignment_strategy
to [StickyPartitionAssignor]
.
Kafka broker version - 2.5.0. kafka-python version - 2.0.2.
There is no problem, if I set strategy as [StickyPartitionAssignor, RangePartitionAssignor, RoundRobinAssignor]
, but sticky does not work this way. If I set only [StickyPartitionAssignor]
, errors always happen.
What I have already done
The first case - The topic is new and has only one partition.
When I start my consumer I get TypeError: object of type 'dict_itemiterator' has no len()
because an iterator is not expected here.
Okay, perhaps the problem is that the topic is not partitioned.
Full traceback:
Traceback (most recent call last):
File "/usr/lib/python3.9/runpy.py", line 197, in _run_module_as_main
return _run_code(code, main_globals, None,
File "/usr/lib/python3.9/runpy.py", line 87, in _run_code
exec(code, run_globals)
File "/home/user/projects/manage.py", line 24, in <module>
execute_from_command_line(sys.argv)
File "/home/user/projects/.venv/lib/python3.9/site-packages/django/core/management/__init__.py", line 381, in execute_from_command_line
utility.execute()
File "/home/user/projects/.venv/lib/python3.9/site-packages/django/core/management/__init__.py", line 375, in execute
self.fetch_command(subcommand).run_from_argv(self.argv)
File "/home/user/projects/.venv/lib/python3.9/site-packages/django/core/management/base.py", line 323, in run_from_argv
self.execute(*args, **cmd_options)
File "/home/user/projects/.venv/lib/python3.9/site-packages/django/core/management/base.py", line 364, in execute
output = self.handle(*args, **options)
File "/home/user/projects/src/apps/kafka/management/commands/runmainworker.py", line 24, in handle
self.run(flow)
File "/home/user/projects/src/apps/kafka/management/commands/runmainworker.py", line 34, in run
Worker(flow.lower()).run()
File "/home/user/projects/src/apps/kafka/worker.py", line 44, in run
for msg in self.consumer:
File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/consumer/group.py", line 1193, in __next__
return self.next_v2()
File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/consumer/group.py", line 1201, in next_v2
return next(self._iterator)
File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/consumer/group.py", line 1116, in _message_generator_v2
record_map = self.poll(timeout_ms=timeout_ms, update_offsets=False)
File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/consumer/group.py", line 655, in poll
records = self._poll_once(remaining, max_records, update_offsets=update_offsets)
File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/consumer/group.py", line 675, in _poll_once
self._coordinator.poll()
File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/coordinator/consumer.py", line 289, in poll
self.ensure_active_group()
File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/coordinator/base.py", line 390, in ensure_active_group
future = self._send_join_group_request()
File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/coordinator/base.py", line 453, in _send_join_group_request
for protocol, metadata in self.group_protocols()
File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/coordinator/consumer.py", line 154, in group_protocols
metadata = assignor.metadata(self._joined_subscription)
File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/coordinator/assignors/sticky/sticky_assignor.py", line 660, in metadata
user_data = data.encode()
File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/util.py", line 50, in __call__
return self.method()(self.target(), *args, **kwargs)
File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/protocol/struct.py", line 42, in _encode_self
return self.SCHEMA.encode(
File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/protocol/types.py", line 146, in encode
return b''.join([
File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/protocol/types.py", line 147, in <listcomp>
field.encode(item[i])
File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/protocol/types.py", line 185, in encode
[Int32.encode(len(items))] +
TypeError: object of type 'dict_itemiterator' has no len()
The second case - The topic is new and has three partitions.
I created one more topic and started the first consumer. And everything is ok, it started and consumed messages. I got desired log:
2022-02-25 13:14:34,879 | INFO | Elected group leader -- performing partition assignments using sticky
But right after I start the second consumer, the first immediately shutted down with the same error as described above. The server log was:
[2022-02-25 10:14:24,282] INFO [GroupCoordinator 1002]: Preparing to rebalance group group_id in state PreparingRebalance with old generation 181 (__consumer_offsets-37) (reason: Adding new member consumer_id_2 with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2022-02-25 10:14:34,874] INFO [GroupCoordinator 1002]: Member consumer_id_1 in group group_id has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
The third case - The topic is new and has three partitions + small fix
I overridden one row in StickyPartitionAssignor.metadata()
to pass a list of partitions_by_topic
instead of an iterator here
But when new consumer is connected to the group, all previous fails with the following error:
Traceback (most recent call last):
File "/usr/lib/python3.9/runpy.py", line 197, in _run_module_as_main
return _run_code(code, main_globals, None,
File "/usr/lib/python3.9/runpy.py", line 87, in _run_code
exec(code, run_globals)
File "/home/user/projects/manage.py", line 24, in <module>
execute_from_command_line(sys.argv)
File "/home/user/projects/.venv/lib/python3.9/site-packages/django/core/management/__init__.py", line 381, in execute_from_command_line
utility.execute()
File "/home/user/projects/.venv/lib/python3.9/site-packages/django/core/management/__init__.py", line 375, in execute
self.fetch_command(subcommand).run_from_argv(self.argv)
File "/home/user/projects/.venv/lib/python3.9/site-packages/django/core/management/base.py", line 323, in run_from_argv
self.execute(*args, **cmd_options)
File "/home/user/projects/.venv/lib/python3.9/site-packages/django/core/management/base.py", line 364, in execute
output = self.handle(*args, **options)
File "/home/user/projects/src/apps/kafka/management/commands/runmainworker.py", line 24, in handle
self.run(flow)
File "/home/user/projects/src/apps/kafka/management/commands/runmainworker.py", line 34, in run
Worker(flow.lower()).run()
File "/home/user/projects/src/apps/kafka/worker.py", line 44, in run
for msg in self.consumer:
File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/consumer/group.py", line 1193, in __next__
return self.next_v2()
File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/consumer/group.py", line 1201, in next_v2
return next(self._iterator)
File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/consumer/group.py", line 1116, in _message_generator_v2
record_map = self.poll(timeout_ms=timeout_ms, update_offsets=False)
File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/consumer/group.py", line 655, in poll
records = self._poll_once(remaining, max_records, update_offsets=update_offsets)
File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/consumer/group.py", line 675, in _poll_once
self._coordinator.poll()
File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/coordinator/consumer.py", line 289, in poll
self.ensure_active_group()
File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/coordinator/base.py", line 390, in ensure_active_group
future = self._send_join_group_request()
File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/coordinator/base.py", line 453, in _send_join_group_request
for protocol, metadata in self.group_protocols()
File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/coordinator/consumer.py", line 154, in group_protocols
metadata = assignor.metadata(self._joined_subscription)
File "/home/user/projects/src/libs/kafka.py", line 49, in metadata
user_data = data.encode()
File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/util.py", line 50, in __call__
return self.method()(self.target(), *args, **kwargs)
File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/protocol/struct.py", line 42, in _encode_self
return self.SCHEMA.encode(
File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/protocol/types.py", line 146, in encode
return b''.join([
File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/protocol/types.py", line 147, in <listcomp>
field.encode(item[i])
File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/protocol/types.py", line 186, in encode
[self.array_of.encode(item) for item in items]
File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/protocol/types.py", line 186, in <listcomp>
[self.array_of.encode(item) for item in items]
File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/protocol/types.py", line 145, in encode
raise ValueError('Item field count does not match Schema')
ValueError: Item field count does not match Schema
So this fix does not work.
The fourth case - Connect to an existing topic with no active consumers with an existing consumer group id
I have a topic that is permanently in use with default partition assignment strategies and I shutted down all consumers, changed strategy to [StickyPartitionAssignor]
and started one consumer.
This is what I got:
Traceback (most recent call last):
File "/app/manage.py", line 24, in <module>
execute_from_command_line(sys.argv)
File "/usr/local/lib/python3.9/site-packages/django/core/management/__init__.py", line 381, in execute_from_command_line
utility.execute()
File "/usr/local/lib/python3.9/site-packages/django/core/management/__init__.py", line 375, in execute
self.fetch_command(subcommand).run_from_argv(self.argv)
File "/usr/local/lib/python3.9/site-packages/django/core/management/base.py", line 323, in run_from_argv
self.execute(*args, **cmd_options)
File "/usr/local/lib/python3.9/site-packages/django/core/management/base.py", line 364, in execute
output = self.handle(*args, **options)
File "/app/src/apps/kafka/management/commands/runmainworker.py", line 24, in handle
self.run(flow)
File "/app/src/apps/kafka/management/commands/runmainworker.py", line 34, in run
Worker(flow.lower()).run()
File "/app/src/apps/kafka/worker.py", line 44, in run
for msg in self.consumer:
File "/usr/local/lib/python3.9/site-packages/kafka/consumer/group.py", line 1193, in __next__
return self.next_v2()
File "/usr/local/lib/python3.9/site-packages/kafka/consumer/group.py", line 1201, in next_v2
return next(self._iterator)
File "/usr/local/lib/python3.9/site-packages/kafka/consumer/group.py", line 1116, in _message_generator_v2
record_map = self.poll(timeout_ms=timeout_ms, update_offsets=False)
File "/usr/local/lib/python3.9/site-packages/kafka/consumer/group.py", line 655, in poll
records = self._poll_once(remaining, max_records, update_offsets=update_offsets)
File "/usr/local/lib/python3.9/site-packages/kafka/consumer/group.py", line 675, in _poll_once
self._coordinator.poll()
File "/usr/local/lib/python3.9/site-packages/kafka/coordinator/consumer.py", line 289, in poll
self.ensure_active_group()
File "/usr/local/lib/python3.9/site-packages/kafka/coordinator/base.py", line 425, in ensure_active_group
raise exception # pylint: disable-msg=raising-bad-type
kafka.errors.InconsistentGroupProtocolError: [Error 23] InconsistentGroupProtocolError: JoinGroupResponse_v2(throttle_time_ms=0, error_code=23, generation_id=-1, group_protocol='', leader_id='', member_id='', members=[])
The fifth case - Connect to an existing topic with no active consumers with a new consumer group id
The same as described in cases № 1 & 2.
Addition
As I can see ConsumerProtocol does not support sticky
strategy as defined here, but I don't see where ConsumerProtocol.ASSIGNMENT_STRATEGIES
is directly used.
Any ideas?
Does this strategy work at all? What do I do wrong?
Even I am getting this error, can maintainer check the issue?
kafka.errors.InconsistentGroupProtocolError: [Error 23] InconsistentGroupProtocolError: