confluent-kafka-python
confluent-kafka-python copied to clipboard
Ambigous error when consumer is fenced by group.instance.id
Description
When using the group.instance.id for two consumers and they conflict, the fenced client receives an ambiguous error.
The .code() of the error is KafkaError._FATAL which is generic, but the .str() of the error suggests that is specifically a KafkaError.FENCED_INSTANCE_ID.
Strangely the .str() of the error is duplicated/redundant for some reason.
Here is the ambiguous error exactly: KafkaError{code=_FATAL,val=-150,str="Fatal error: Broker: Static consumer fenced by other consumer with same group.instance.id: Fatal consumer error: Broker: Static consumer fenced by other consumer with same group.instance.id"}.
Another possible issue with the error is that the .code() suggests this is an unrecoverable error, but the .fatal() returns false.
How to reproduce
Replace the kafka_servers with your development servers and set topic to any valid and empty topic.
The last assertion fails because the error is ambiguous.
topic = "testingtopic"
kafka_servers = ["127.0.0.1:9092"]
def _new_consumer():
return Consumer(
{
"bootstrap.servers": " ".join(kafka_servers),
"group.id": "my_fleet",
"group.instance.id": "ship_0",
"enable.auto.commit": False,
"enable.auto.offset.store": False,
"default.topic.config": {"auto.offset.reset": "earliest",},
}
)
# start first consumer instance
first_c = _new_consumer()
first_c.subscribe([topic])
assert first_c.poll(timeout=10) is None
assert len(first_c.assignment()) > 0
# start replacement consumer instance (first will be fenced off in favor of this)
second_c = _new_consumer()
second_c.subscribe([topic])
assert second_c.poll(timeout=10) is None
assert len(second_c.assignment()) > 0
# check if first consumer is fenced off as it should be
msg = first_c.poll(timeout=5)
assert msg is not None
assert msg.error() is not None
assert msg.error().code() == KafkaError.FENCED_INSTANCE_ID, "%s %s" % (msg.error().fatal(), msg.error(),)
Checklist
Please provide the following information:
- [x] confluent-kafka-python and librdkafka version:
('1.4.1', 17039616), ('1.4.0', 17039615) - [x] Apache Kafka broker version:
2.5.0 (bitnami docker image) - [x] Client configuration:
included in the reproduction snippet - [x] Operating system:
Arch Linux (Linux lgg 5.6.5-arch3-1 #1 SMP PREEMPT Sun, 19 Apr 2020 13:14:25 +0000 x86_64 GNU/Linux) - [ ] Provide client logs
- [ ] Provide broker log excerpts
- [ ] Critical issue
I'm not sure what to provide with client/broker logs as everything works, except the error seems to be badly constructed. If more information is needed I'll provide it.
Looks like the librdkafka handling is the issue. The error code is being explicitly overwritten. https://github.com/edenhill/librdkafka/blob/1a722553638bba85dbda5050455f7b9a5ef302de/src/rdkafka_cgrp.c#L1856
Any update on this so far? Having the same issue on Go
I've added this to the queue of items to look at -- it looks like we the overwriting error code line should be removed, but I need to discuss why it was done in the first place with someone more familiar with the history here