kafka-python icon indicating copy to clipboard operation
kafka-python copied to clipboard

Unable to use StickyPartitionAssignor

Open KazakovDenis opened this issue 3 years ago • 3 comments

Hello!

The problem

I want to minimize downtime during rebalance and try to use "sticky" strategy but every time meet different problems. I use KafkaConsumer with default settings and only set group_id and change partition_assignment_strategy to [StickyPartitionAssignor].

Kafka broker version - 2.5.0. kafka-python version - 2.0.2.

There is no problem, if I set strategy as [StickyPartitionAssignor, RangePartitionAssignor, RoundRobinAssignor], but sticky does not work this way. If I set only [StickyPartitionAssignor], errors always happen.

What I have already done

The first case - The topic is new and has only one partition.

When I start my consumer I get TypeError: object of type 'dict_itemiterator' has no len() because an iterator is not expected here.

Okay, perhaps the problem is that the topic is not partitioned.

Full traceback:

Traceback (most recent call last):
  File "/usr/lib/python3.9/runpy.py", line 197, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/usr/lib/python3.9/runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "/home/user/projects/manage.py", line 24, in <module>
    execute_from_command_line(sys.argv)
  File "/home/user/projects/.venv/lib/python3.9/site-packages/django/core/management/__init__.py", line 381, in execute_from_command_line
    utility.execute()
  File "/home/user/projects/.venv/lib/python3.9/site-packages/django/core/management/__init__.py", line 375, in execute
    self.fetch_command(subcommand).run_from_argv(self.argv)
  File "/home/user/projects/.venv/lib/python3.9/site-packages/django/core/management/base.py", line 323, in run_from_argv
    self.execute(*args, **cmd_options)
  File "/home/user/projects/.venv/lib/python3.9/site-packages/django/core/management/base.py", line 364, in execute
    output = self.handle(*args, **options)
  File "/home/user/projects/src/apps/kafka/management/commands/runmainworker.py", line 24, in handle
    self.run(flow)
  File "/home/user/projects/src/apps/kafka/management/commands/runmainworker.py", line 34, in run
    Worker(flow.lower()).run()
  File "/home/user/projects/src/apps/kafka/worker.py", line 44, in run
    for msg in self.consumer:
  File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/consumer/group.py", line 1193, in __next__
    return self.next_v2()
  File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/consumer/group.py", line 1201, in next_v2
    return next(self._iterator)
  File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/consumer/group.py", line 1116, in _message_generator_v2
    record_map = self.poll(timeout_ms=timeout_ms, update_offsets=False)
  File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/consumer/group.py", line 655, in poll
    records = self._poll_once(remaining, max_records, update_offsets=update_offsets)
  File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/consumer/group.py", line 675, in _poll_once
    self._coordinator.poll()
  File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/coordinator/consumer.py", line 289, in poll
    self.ensure_active_group()
  File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/coordinator/base.py", line 390, in ensure_active_group
    future = self._send_join_group_request()
  File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/coordinator/base.py", line 453, in _send_join_group_request
    for protocol, metadata in self.group_protocols()
  File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/coordinator/consumer.py", line 154, in group_protocols
    metadata = assignor.metadata(self._joined_subscription)
  File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/coordinator/assignors/sticky/sticky_assignor.py", line 660, in metadata
    user_data = data.encode()
  File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/util.py", line 50, in __call__
    return self.method()(self.target(), *args, **kwargs)
  File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/protocol/struct.py", line 42, in _encode_self
    return self.SCHEMA.encode(
  File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/protocol/types.py", line 146, in encode
    return b''.join([
  File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/protocol/types.py", line 147, in <listcomp>
    field.encode(item[i])
  File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/protocol/types.py", line 185, in encode
    [Int32.encode(len(items))] +
TypeError: object of type 'dict_itemiterator' has no len()

The second case - The topic is new and has three partitions.

I created one more topic and started the first consumer. And everything is ok, it started and consumed messages. I got desired log:

2022-02-25 13:14:34,879 | INFO | Elected group leader -- performing partition assignments using sticky

But right after I start the second consumer, the first immediately shutted down with the same error as described above. The server log was:

[2022-02-25 10:14:24,282] INFO [GroupCoordinator 1002]: Preparing to rebalance group group_id in state PreparingRebalance with old generation 181 (__consumer_offsets-37) (reason: Adding new member consumer_id_2 with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2022-02-25 10:14:34,874] INFO [GroupCoordinator 1002]: Member consumer_id_1 in group group_id has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)

The third case - The topic is new and has three partitions + small fix

I overridden one row in StickyPartitionAssignor.metadata() to pass a list of partitions_by_topic instead of an iterator here

But when new consumer is connected to the group, all previous fails with the following error:

Traceback (most recent call last):
  File "/usr/lib/python3.9/runpy.py", line 197, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/usr/lib/python3.9/runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "/home/user/projects/manage.py", line 24, in <module>
    execute_from_command_line(sys.argv)
  File "/home/user/projects/.venv/lib/python3.9/site-packages/django/core/management/__init__.py", line 381, in execute_from_command_line
    utility.execute()
  File "/home/user/projects/.venv/lib/python3.9/site-packages/django/core/management/__init__.py", line 375, in execute
    self.fetch_command(subcommand).run_from_argv(self.argv)
  File "/home/user/projects/.venv/lib/python3.9/site-packages/django/core/management/base.py", line 323, in run_from_argv
    self.execute(*args, **cmd_options)
  File "/home/user/projects/.venv/lib/python3.9/site-packages/django/core/management/base.py", line 364, in execute
    output = self.handle(*args, **options)
  File "/home/user/projects/src/apps/kafka/management/commands/runmainworker.py", line 24, in handle
    self.run(flow)
  File "/home/user/projects/src/apps/kafka/management/commands/runmainworker.py", line 34, in run
    Worker(flow.lower()).run()
  File "/home/user/projects/src/apps/kafka/worker.py", line 44, in run
    for msg in self.consumer:
  File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/consumer/group.py", line 1193, in __next__
    return self.next_v2()
  File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/consumer/group.py", line 1201, in next_v2
    return next(self._iterator)
  File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/consumer/group.py", line 1116, in _message_generator_v2
    record_map = self.poll(timeout_ms=timeout_ms, update_offsets=False)
  File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/consumer/group.py", line 655, in poll
    records = self._poll_once(remaining, max_records, update_offsets=update_offsets)
  File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/consumer/group.py", line 675, in _poll_once
    self._coordinator.poll()
  File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/coordinator/consumer.py", line 289, in poll
    self.ensure_active_group()
  File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/coordinator/base.py", line 390, in ensure_active_group
    future = self._send_join_group_request()
  File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/coordinator/base.py", line 453, in _send_join_group_request
    for protocol, metadata in self.group_protocols()
  File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/coordinator/consumer.py", line 154, in group_protocols
    metadata = assignor.metadata(self._joined_subscription)
  File "/home/user/projects/src/libs/kafka.py", line 49, in metadata
    user_data = data.encode()
  File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/util.py", line 50, in __call__
    return self.method()(self.target(), *args, **kwargs)
  File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/protocol/struct.py", line 42, in _encode_self
    return self.SCHEMA.encode(
  File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/protocol/types.py", line 146, in encode
    return b''.join([
  File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/protocol/types.py", line 147, in <listcomp>
    field.encode(item[i])
  File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/protocol/types.py", line 186, in encode
    [self.array_of.encode(item) for item in items]
  File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/protocol/types.py", line 186, in <listcomp>
    [self.array_of.encode(item) for item in items]
  File "/home/user/projects/.venv/lib/python3.9/site-packages/kafka/protocol/types.py", line 145, in encode
    raise ValueError('Item field count does not match Schema')
ValueError: Item field count does not match Schema

So this fix does not work.

The fourth case - Connect to an existing topic with no active consumers with an existing consumer group id

I have a topic that is permanently in use with default partition assignment strategies and I shutted down all consumers, changed strategy to [StickyPartitionAssignor] and started one consumer.

This is what I got:

Traceback (most recent call last):
  File "/app/manage.py", line 24, in <module>
    execute_from_command_line(sys.argv)
  File "/usr/local/lib/python3.9/site-packages/django/core/management/__init__.py", line 381, in execute_from_command_line
    utility.execute()
  File "/usr/local/lib/python3.9/site-packages/django/core/management/__init__.py", line 375, in execute
    self.fetch_command(subcommand).run_from_argv(self.argv)
  File "/usr/local/lib/python3.9/site-packages/django/core/management/base.py", line 323, in run_from_argv
    self.execute(*args, **cmd_options)
  File "/usr/local/lib/python3.9/site-packages/django/core/management/base.py", line 364, in execute
    output = self.handle(*args, **options)
  File "/app/src/apps/kafka/management/commands/runmainworker.py", line 24, in handle
    self.run(flow)
  File "/app/src/apps/kafka/management/commands/runmainworker.py", line 34, in run
    Worker(flow.lower()).run()
  File "/app/src/apps/kafka/worker.py", line 44, in run
    for msg in self.consumer:
  File "/usr/local/lib/python3.9/site-packages/kafka/consumer/group.py", line 1193, in __next__
    return self.next_v2()
  File "/usr/local/lib/python3.9/site-packages/kafka/consumer/group.py", line 1201, in next_v2
    return next(self._iterator)
  File "/usr/local/lib/python3.9/site-packages/kafka/consumer/group.py", line 1116, in _message_generator_v2
    record_map = self.poll(timeout_ms=timeout_ms, update_offsets=False)
  File "/usr/local/lib/python3.9/site-packages/kafka/consumer/group.py", line 655, in poll
    records = self._poll_once(remaining, max_records, update_offsets=update_offsets)
  File "/usr/local/lib/python3.9/site-packages/kafka/consumer/group.py", line 675, in _poll_once
    self._coordinator.poll()
  File "/usr/local/lib/python3.9/site-packages/kafka/coordinator/consumer.py", line 289, in poll
    self.ensure_active_group()
  File "/usr/local/lib/python3.9/site-packages/kafka/coordinator/base.py", line 425, in ensure_active_group
    raise exception  # pylint: disable-msg=raising-bad-type
kafka.errors.InconsistentGroupProtocolError: [Error 23] InconsistentGroupProtocolError: JoinGroupResponse_v2(throttle_time_ms=0, error_code=23, generation_id=-1, group_protocol='', leader_id='', member_id='', members=[])

The fifth case - Connect to an existing topic with no active consumers with a new consumer group id

The same as described in cases № 1 & 2.

Addition

As I can see ConsumerProtocol does not support sticky strategy as defined here, but I don't see where ConsumerProtocol.ASSIGNMENT_STRATEGIES is directly used.

Any ideas?

Does this strategy work at all? What do I do wrong?

KazakovDenis avatar Feb 25 '22 11:02 KazakovDenis

Even I am getting this error, can maintainer check the issue?

kafka.errors.InconsistentGroupProtocolError: [Error 23] InconsistentGroupProtocolError:

prabhatkgupta avatar Jan 10 '24 22:01 prabhatkgupta