confluent-kafka-python icon indicating copy to clipboard operation
confluent-kafka-python copied to clipboard

Ambigous error when consumer is fenced by group.instance.id

Open Traktormaster opened this issue 5 years ago • 2 comments

Description

When using the group.instance.id for two consumers and they conflict, the fenced client receives an ambiguous error.

The .code() of the error is KafkaError._FATAL which is generic, but the .str() of the error suggests that is specifically a KafkaError.FENCED_INSTANCE_ID. Strangely the .str() of the error is duplicated/redundant for some reason. Here is the ambiguous error exactly: KafkaError{code=_FATAL,val=-150,str="Fatal error: Broker: Static consumer fenced by other consumer with same group.instance.id: Fatal consumer error: Broker: Static consumer fenced by other consumer with same group.instance.id"}.

Another possible issue with the error is that the .code() suggests this is an unrecoverable error, but the .fatal() returns false.

How to reproduce

Replace the kafka_servers with your development servers and set topic to any valid and empty topic. The last assertion fails because the error is ambiguous.

topic = "testingtopic"
kafka_servers = ["127.0.0.1:9092"]

def _new_consumer():
    return Consumer(
        {
            "bootstrap.servers": " ".join(kafka_servers),
            "group.id": "my_fleet",
            "group.instance.id": "ship_0",
            "enable.auto.commit": False,
            "enable.auto.offset.store": False,
            "default.topic.config": {"auto.offset.reset": "earliest",},
        }
    )

# start first consumer instance
first_c = _new_consumer()
first_c.subscribe([topic])
assert first_c.poll(timeout=10) is None
assert len(first_c.assignment()) > 0

# start replacement consumer instance (first will be fenced off in favor of this)
second_c = _new_consumer()
second_c.subscribe([topic])
assert second_c.poll(timeout=10) is None
assert len(second_c.assignment()) > 0

# check if first consumer is fenced off as it should be
msg = first_c.poll(timeout=5)
assert msg is not None
assert msg.error() is not None
assert msg.error().code() == KafkaError.FENCED_INSTANCE_ID, "%s %s" % (msg.error().fatal(), msg.error(),)

Checklist

Please provide the following information:

  • [x] confluent-kafka-python and librdkafka version: ('1.4.1', 17039616), ('1.4.0', 17039615)
  • [x] Apache Kafka broker version: 2.5.0 (bitnami docker image)
  • [x] Client configuration: included in the reproduction snippet
  • [x] Operating system: Arch Linux (Linux lgg 5.6.5-arch3-1 #1 SMP PREEMPT Sun, 19 Apr 2020 13:14:25 +0000 x86_64 GNU/Linux)
  • [ ] Provide client logs
  • [ ] Provide broker log excerpts
  • [ ] Critical issue

I'm not sure what to provide with client/broker logs as everything works, except the error seems to be badly constructed. If more information is needed I'll provide it.

Traktormaster avatar May 16 '20 14:05 Traktormaster

Looks like the librdkafka handling is the issue. The error code is being explicitly overwritten. https://github.com/edenhill/librdkafka/blob/1a722553638bba85dbda5050455f7b9a5ef302de/src/rdkafka_cgrp.c#L1856

chris-kimberley avatar May 27 '21 05:05 chris-kimberley

Any update on this so far? Having the same issue on Go

dsha256 avatar May 28 '24 22:05 dsha256

I've added this to the queue of items to look at -- it looks like we the overwriting error code line should be removed, but I need to discuss why it was done in the first place with someone more familiar with the history here

MSeal avatar Jun 30 '25 23:06 MSeal