pykafka
pykafka copied to clipboard
Question about dealing with broker restart
When a broker restarts (e.g. kafka's host system restarts), pykafka producer produce will fail with NoBrokersAvailableError: Unable to connect to a broker to fetch metadata. See logs. forever. This happens for all new produce calls even when kafka is back online again.
Is automatic reconnection supported or do we have to handle it ourselves? Any recommended way of doing this? Is NoBrokersAvailableError the only error that will be thrown? Thanks.
Background info: In my code I create the client, get topic, then topic.get_producer. producer.produce to produce.
The reason I asked this is that I share the client object and producer object throughout the code. Adding reconnect code may add complexity.
My experiment shows that recreating producer on exception will do. But there will be errors like this occasionally after reconnect:
Traceback (most recent call last):
File "bin/test-kafka.py", line 13, in <module>
producer.produce('hello %s' % time.time())
File "/vagrant/venv/local/lib/python2.7/site-packages/pykafka/producer.py", line 414, in produce
self._raise_worker_exceptions()
File "/vagrant/venv/local/lib/python2.7/site-packages/pykafka/producer.py", line 240, in _raise_worker_exceptions
reraise(*self._worker_exception)
File "/vagrant/venv/local/lib/python2.7/site-packages/pykafka/producer.py", line 621, in queue_reader
self.producer._send_request(batch, self)
File "/vagrant/venv/local/lib/python2.7/site-packages/pykafka/producer.py", line 529, in _send_request
self._update()
File "/vagrant/venv/local/lib/python2.7/site-packages/pykafka/producer.py", line 275, in _update
self._cluster.update()
File "/vagrant/venv/local/lib/python2.7/site-packages/pykafka/cluster.py", line 518, in update
self._update_brokers(metadata.brokers, metadata.controller_id)
File "/vagrant/venv/local/lib/python2.7/site-packages/pykafka/cluster.py", line 416, in _update_brokers
controller_id))
KeyError: 'Controller ID -1 not present in cluster'
My testing code:
from gevent import monkey
monkey.patch_all()
from pykafka import KafkaClient
import time
import traceback
_client = KafkaClient(hosts='', use_greenlets=True, broker_version='0.10.0')
topic = _client.topics['test']
producer = topic.get_producer(min_queued_messages=1)
while True:
try:
producer.produce('hello %s' % time.time())
except Exception as e:
print(traceback.format_exc())
producer = topic.get_producer(min_queued_messages=1)
else:
print('ok')
time.sleep(1)
The producer should ideally be resilient to Kafka nodes dying, but I know there are currently some cases where it's not. One thing to watch out for is that your hosts list is long enough. I believe that if you only list a single node in hosts and it dies, pykafka will not be able to reconnect. This is because it uses the hosts string as the list of brokers to which to attempt connections, so if the only node in the hosts string is also down, it will have no other nodes to look for. A good strategy might be to list all of your kafka nodes in the hosts string, or try using the zookeeper_hosts kwarg.
I am using "monkey.patch_all()" too, when a broker restarts frequently,my main thread hangs up, I try to recreate a consumer,the error log is “simpleconsumer.py [line:279] INFO Updating cluster in response to NotCoordinatorForGroup”.