pykafka icon indicating copy to clipboard operation
pykafka copied to clipboard

Question about dealing with broker restart

Open carsonip opened this issue 6 years ago • 5 comments

When a broker restarts (e.g. kafka's host system restarts), pykafka producer produce will fail with NoBrokersAvailableError: Unable to connect to a broker to fetch metadata. See logs. forever. This happens for all new produce calls even when kafka is back online again.

Is automatic reconnection supported or do we have to handle it ourselves? Any recommended way of doing this? Is NoBrokersAvailableError the only error that will be thrown? Thanks.

Background info: In my code I create the client, get topic, then topic.get_producer. producer.produce to produce.

carsonip avatar Jan 16 '19 07:01 carsonip

The reason I asked this is that I share the client object and producer object throughout the code. Adding reconnect code may add complexity.

carsonip avatar Jan 16 '19 08:01 carsonip

My experiment shows that recreating producer on exception will do. But there will be errors like this occasionally after reconnect:

Traceback (most recent call last):
  File "bin/test-kafka.py", line 13, in <module>
    producer.produce('hello %s' % time.time())
  File "/vagrant/venv/local/lib/python2.7/site-packages/pykafka/producer.py", line 414, in produce
    self._raise_worker_exceptions()
  File "/vagrant/venv/local/lib/python2.7/site-packages/pykafka/producer.py", line 240, in _raise_worker_exceptions
    reraise(*self._worker_exception)
  File "/vagrant/venv/local/lib/python2.7/site-packages/pykafka/producer.py", line 621, in queue_reader
    self.producer._send_request(batch, self)
  File "/vagrant/venv/local/lib/python2.7/site-packages/pykafka/producer.py", line 529, in _send_request
    self._update()
  File "/vagrant/venv/local/lib/python2.7/site-packages/pykafka/producer.py", line 275, in _update
    self._cluster.update()
  File "/vagrant/venv/local/lib/python2.7/site-packages/pykafka/cluster.py", line 518, in update
    self._update_brokers(metadata.brokers, metadata.controller_id)
  File "/vagrant/venv/local/lib/python2.7/site-packages/pykafka/cluster.py", line 416, in _update_brokers
    controller_id))
KeyError: 'Controller ID -1 not present in cluster'

carsonip avatar Jan 16 '19 08:01 carsonip

My testing code:

from gevent import monkey
monkey.patch_all()
from pykafka import KafkaClient
import time
import traceback

_client = KafkaClient(hosts='', use_greenlets=True, broker_version='0.10.0')
topic = _client.topics['test']
producer = topic.get_producer(min_queued_messages=1)

while True:
    try:
        producer.produce('hello %s' % time.time())
    except Exception as e:
        print(traceback.format_exc())
        producer = topic.get_producer(min_queued_messages=1)
    else:
        print('ok')
    time.sleep(1)

carsonip avatar Jan 16 '19 08:01 carsonip

The producer should ideally be resilient to Kafka nodes dying, but I know there are currently some cases where it's not. One thing to watch out for is that your hosts list is long enough. I believe that if you only list a single node in hosts and it dies, pykafka will not be able to reconnect. This is because it uses the hosts string as the list of brokers to which to attempt connections, so if the only node in the hosts string is also down, it will have no other nodes to look for. A good strategy might be to list all of your kafka nodes in the hosts string, or try using the zookeeper_hosts kwarg.

emmettbutler avatar Jan 16 '19 17:01 emmettbutler

I am using "monkey.patch_all()" too, when a broker restarts frequently,my main thread hangs up, I try to recreate a consumer,the error log is “simpleconsumer.py [line:279] INFO Updating cluster in response to NotCoordinatorForGroup”.

meikuwu avatar Jan 21 '19 09:01 meikuwu