kazoo icon indicating copy to clipboard operation
kazoo copied to clipboard

in listener function can't call command such as 'exists, create, etc.'

Open tekpig opened this issue 9 years ago • 9 comments

I implements my listener as follows:

def my_listener(state): global zk global child_path global child_value

if state == KazooState.LOST:
    logger.warn("lost connection to zookeeper server")
elif state == KazooState.SUSPENDED:
    logger.warn("connection has been lost but may be recovered")
else:
    logger.info("connect/reconnect to zookeeper server")
    if zk is not None and child_path is not None:
        if not zk.exists(child_path):
            try:
                zk.create(child_path, child_value.encode('utf-8'), ephemeral=True)
            except Exception as e:
                logger.exception(e)

But when command is executed till calling function 'exists', the client gets stuck.

I add logs to track what happened, then I found that because in _connect function the command 'client._session_callback(KeeperState.CONNECTED)' doesn't return.

Next, I found the issue in "remove = listener(state)" in client.py.

In document, it said "creating ephemeral nodes, its highly recommended to add a state listener so that your program can properly deal with connection interruptions or a Zookeeper session loss."

Then, how can I do when connection comes back?

tekpig avatar Sep 14 '16 08:09 tekpig

maybe you should use try before 'if not zk.exists(child_path)' command , because all methods in zkclient will raise exception when something wrong in it, just like:

if state == KazooState.LOST:
    logger.warn("lost connection to zookeeper server")
elif state == KazooState.SUSPENDED:
    logger.warn("connection has been lost but may be recovered")
else:
    logger.info("connect/reconnect to zookeeper server")
    if zk is not None and child_path is not None:
        try:
            if not zk.exists(child_path):
                zk.create(child_path, child_value.encode('utf-8'), ephemeral=True)
        except Exception as e:
            logger.exception(e)

luofeilong avatar Sep 15 '16 01:09 luofeilong

@luofeilong it's not that case, i have tried it.

You can reproduce it easily.

  1. let all zookeeper servers down;
  2. cusomerize the listener function, and add this listener to the KazooClient in the listener function, add some zookeeper operations in 'state == KazooState.CONNECTED' branch, such as ensure_path, exists, etc.
  3. some minutes later, let all zookeeper servers come up again;
  4. you can find no any response from zookeeper server, if you dig into it, you will find the real fact is the request in 'state == KazooState.CONNECTED' branch hasn't been sent out though the client is in connected state.

i guess the problem is in zk_loop.

tekpig avatar Sep 15 '16 15:09 tekpig

can you show you test code? and kazoo version in your project

luofeilong avatar Sep 18 '16 05:09 luofeilong

@luofeilong version is kazoo (2.2.1)

`# -- coding: utf-8 --

import sys import logging import signal import re import subprocess import time

logging.basicConfig(level=logging.NOTSET) logger = logging.getLogger()

def is_running(process): s = subprocess.Popen(['ps', 'axw'], stdout=subprocess.PIPE) for x in s.stdout: if re.search(process, x): return True return False

class RedisNotUpException(Exception): def init(self, arg): self.args = arg

zk = None parent_path = None child_path = None child_value = None

from kazoo.exceptions import NodeExistsError from kazoo.protocol.states import KazooState from kazoo.client import KazooClient from kazoo.retry import KazooRetry

class NotConnectToZkServers(Exception): def init(self, arg): self.args = arg

def my_listener(state): global zk global child_path global child_value

if state == KazooState.LOST:
    logger.warn("lost connection to zookeeper server")
elif state == KazooState.SUSPENDED:
    logger.warn("connection has been lost but may be recovered")
else:
    logger.info("connect/reconnect to zookeeper server")
    if zk is not None and child_path is not None:
        try:
            if not zk.exists(child_path):
                zk.create(child_path, child_value.encode('utf-8'), ephemeral=True)
        except Exception as e:
            logger.exception(e)

def signal_handler(signal, frame): logger.info("receive signal %s" % signal) global zk zk.remove_listener(my_listener) zk.stop() zk.close() sys.exit(0)

signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler)

if name == 'main': if len(sys.argv) != 4: logger.error("besides file name, 3 parameters are needed. they're address of zk servers, service name and instance id.") sys.exit(1) if isinstance(sys.argv[1], str): zksrvs = sys.argv[1] else: zksrvs = str(sys.argv[1]) if isinstance(sys.argv[2], str): service = sys.argv[2] else: service = str(sys.argv[2]) if isinstance(sys.argv[3], str): instId = sys.argv[3] else: instId = str(sys.argv[3])

kz_retry = KazooRetry(max_tries=-1, delay=0.5, max_delay=30)

zk = KazooClient(hosts=zksrvs, connection_retry=kz_retry)
try:
    zk.start(timeout=30)
except Exception as e:
    logger.exception(e)

if zk.state != KazooState.CONNECTED:
    raise NotConnectToZkServers("oops! connection can't be established!")

if True: #is_running('redis-server'):
    parent_path = '/REDIS/' + service.upper()
    child_path = parent_path + '/' + str(instId)
    #global child_value = redis_ip + ':' + redis_port + '*' + redis_db
    child_value = '127.0.0.1:9999*0'
    zk.ensure_path(parent_path)
    try:
        zk.create(child_path, child_value.encode('utf-8'), ephemeral=True)
    except NodeExistsError:
        zk.set(child_path, child_value.encode('utf-8'))
    except Exception as e:
        logger.exception(e)
else:
    raise RedisNotUpException("oops! redis can't get up!")

zk.add_listener(my_listener)

while True: #is_running('redis-server')
    time.sleep(1)

`

tekpig avatar Sep 19 '16 03:09 tekpig

ok, i found the problem you can debug in your listener func where you want to call kazoo func, and then you can see it is a dead lock. i show it for you as follow:

in connection.py file

  • reconnect _connect_attempt() func will call _connect to retry connect to zk, when it suc, in _connect() func will call client._session_callback(KeeperState.CONNECTED), and then callback to your listener func, then you are waitting for request result
  • dead lock after _connect in _connect_attempt(), the working thread will try to receive event from socket to get reqeust in queue, but because your listener func has not return, so no one can deal the request queue

your listener func is waitting for working thread to deal your request; but, working thread is waiting for your listener func to return

luofeilong avatar Sep 19 '16 06:09 luofeilong

@luofeilong

yes, you're right.

So, if i want to do the operation as my listener function shows, how can i do that? From my perspective, it's reasonable to do something when the connection gets back, especially for the created ephemeral nodes.

tekpig avatar Sep 19 '16 07:09 tekpig

maybe you can use another thread to deal event

luofeilong avatar Sep 19 '16 08:09 luofeilong

so it's the drawback of this library, i think it should be improved.

tekpig avatar Sep 19 '16 08:09 tekpig

yes,you can do it

luofeilong avatar Sep 19 '16 08:09 luofeilong