confluent-kafka-python
confluent-kafka-python copied to clipboard
Specifying a Python list for config `bootstrap.servers` fails silently with no errors or indications
Description
The configuration key bootstrap.servers implies that multiple servers can be specified, but it is not explicitly defined what this key's value should be. It appears that only a string using comma separation is allowed for this config key. If a Python list is given, no explicit error or feedback is given to indicate this is not valid. Instead, calls to consumer.poll() or other broker-bound functions simply block and never succeed.
Ideally, a Python list (e.g. ["broker1", "broker2"]) should be valid; or at the very least, constructing the Consumer should throw a runtime/configuration exception.
How to reproduce
Simply specify a Python list as the value for bootstrap.servers in the config given to the Consumer.
conf = {
'bootstrap.servers': ['kafkabroker1:9092', 'kafkabroker2:9092'],
...
}
c = Consumer(conf)
while True:
c.poll(1) # this will never produce a message due to the Python list specified above as the bootstrap.servers value
Checklist
Please provide the following information:
- [x] confluent-kafka-python and librdkafka version (
confluent_kafka.version()andconfluent_kafka.libversion()):
>>> confluent_kafka.version()
('1.2.0', 16908288)
>>> confluent_kafka.libversion()
('1.2.0', 16908543)
- [ ] Apache Kafka broker version:
- [ ] Client configuration:
{...} - [x] Operating system: Ubuntu 18.04 and CentOS 7
- [ ] Provide client logs (with
'debug': '..'as necessary) - [ ] Provide broker log excerpts
- [ ] Critical issue
This is a good idea, we should allow lists for config variables and automatically translate to CSV.
I felt on the same bug today. The problem is that the raised exception is misleading. In my case that was:
KafkaError{code=_TRANSPORT,val=-195,str="Failed to get metadata: Local: Broker transport failure"}
Test file: ./test_consumer_config.py
#!/usr/bin/env python3
from confluent_kafka import Consumer
def print_list_of_topics(hosts):
print(f'\nTrying with hosts: {hosts}')
consumer = Consumer({
'bootstrap.servers': hosts,
'group.id': 'kafka-topic-configurator',
'auto.offset.reset': 'earliest'
})
metadata = consumer.list_topics(timeout=11)
print(f'Topics: {list(metadata.topics.keys())}')
print_list_of_topics('localhost:9092')
print_list_of_topics('localhost:9092,kafka:9092')
print_list_of_topics(['localhost:9092'])
Run:
➜ python test_consumer_config.py
Trying with hosts: localhost:9092
Topics: ['test-stream-1', '__consumer_offsets', 'test-stream-2']
Trying with hosts: localhost:9092,kafka:9092
Topics: ['test-stream-1', '__consumer_offsets', 'test-stream-2']
Trying with hosts: ['localhost:9092']
Traceback (most recent call last):
File "test_consumer_config.py", line 19, in <module>
print_list_of_topics(['localhost:9092'])
File "test_consumer_config.py", line 13, in print_list_of_topics
metadata = consumer.list_topics(timeout=11)
cimpl.KafkaException: KafkaError{code=_TRANSPORT,val=-195,str="Failed to get metadata: Local: Broker transport failure"}
This is a good idea, we should allow lists for config variables and automatically translate to CSV.
any processing ?
According to the official documentation: https://docs.confluent.io/kafka-clients/python/current/overview.html#ak-consumer
multiple servers can be specified in the following format: host1:9092,host2:9092
Thanks @llirrikk . I searched how to do it here, but there is no mention of it.