kazoo
kazoo copied to clipboard
After trying to reconnect many times, client gets stuck
Hello,
I run the test program below, and after it connects to ZooKeeper, I block access to port 2181 with iptables. After a while (~90 sec) and a number of retries, the test program gets stuck: it blocks writing to a socketpair that seems full (it seems no thread reads from it).
Test program:
#!/usr/bin/env python3
import kazoo.client
import logging
import queue
import time
logging.basicConfig(level=logging.INFO)
# very low timeouts to reproduce the issue faster
CONNECT_TIMEOUT = 0.2
RETRY_TIMEOUT = 0.1
SESSION_TIMEOUT = 0.2
ZK_HOSTS = 'localhost:2181'
class MyClient:
def __init__(self):
self.zk = kazoo.client.KazooClient(hosts=ZK_HOSTS,
timeout=SESSION_TIMEOUT)
self.mqueue = queue.Queue()
self.zk.add_listener(self.session_listener)
self.zk_state = kazoo.client.KazooState.LOST
self.running = False
def session_listener(self, state):
self.mqueue.put((self.update_state, (state,)))
def update_state(self, state):
logging.info('state {} -> {}'.format(self.zk_state, state))
self.zk_state = state
def reconnect(self):
if self.zk_state == kazoo.client.KazooState.CONNECTED:
return
try:
logging.info('connecting')
if self.running:
self.zk.stop()
self.running = False
self.zk.start(timeout=CONNECT_TIMEOUT)
self.zk_state = kazoo.client.KazooState.CONNECTED
self.running = True
logging.info('connected')
except Exception as e:
logging.error(repr(e))
def main():
cl = MyClient()
while True:
while not cl.mqueue.empty():
f, args = cl.mqueue.get()
f(*args)
cl.reconnect()
time.sleep(RETRY_TIMEOUT)
if __name__ == '__main__':
main()
iptables script:
#!/bin/bash
set -x
case $1 in
block)
echo blocking
for port in 218{1..3}; do
iptables -t filter -A INPUT -p tcp --sport $port -j DROP
iptables -t filter -A OUTPUT -p tcp --dport $port -j DROP
done
;;
unblock)
echo unblocking
for port in 218{1..3}; do
iptables -t filter -D INPUT -p tcp --sport $port -j DROP
iptables -t filter -D OUTPUT -p tcp --dport $port -j DROP
done
;;
*)
echo "unknown command: $1"
exit 1
;;
esac
I also run the script under python3-dbg so I can attach gdb to it, and obtain thread backtraces:
$ gdb python3-dbg $(pgrep python3-dbg )
...
(gdb) i threads
Id Target Id Frame
* 1 Thread 0x7fd5da634740 (LWP 5426) "python3-dbg" 0x00007fd5da23995b in __libc_send (fd=8, buf=0x7fd5da477858, n=1, flags=-1) at ../sysdeps/unix/sysv/linux/x86_64/send.c:31
2 Thread 0x7fd5d7238700 (LWP 6898) "python3-dbg" sem_wait () at ../nptl/sysdeps/unix/sysv/linux/x86_64/sem_wait.S:85
3 Thread 0x7fd5d6236700 (LWP 6899) "python3-dbg" sem_wait () at ../nptl/sysdeps/unix/sysv/linux/x86_64/sem_wait.S:85
(gdb) thread apply all py-bt
Thread 3 (Thread 0x7fd5d6236700 (LWP 6899)):
Traceback (most recent call first):
<built-in method acquire of _thread.lock object at remote 0x7fd5d74bcd50>
File "/usr/lib/python3.4/threading.py", line 290, in wait
waiter.acquire()
File "/usr/lib/python3.4/queue.py", line 167, in get
self.not_empty.wait()
File "/home/saffroy/ring/modules/membership/src/venv-dbg/lib/python3.4/site-packages/kazoo/handlers/threading.py", line 124, in _thread_worker
func = queue.get()
File "/usr/lib/python3.4/threading.py", line 868, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python3.4/threading.py", line 920, in _bootstrap_inner
self.run()
File "/usr/lib/python3.4/threading.py", line 888, in _bootstrap
self._bootstrap_inner()
Thread 2 (Thread 0x7fd5d7238700 (LWP 6898)):
Traceback (most recent call first):
<built-in method acquire of _thread.lock object at remote 0x7fd5d74bc8d8>
File "/usr/lib/python3.4/threading.py", line 290, in wait
waiter.acquire()
File "/usr/lib/python3.4/queue.py", line 167, in get
self.not_empty.wait()
File "/home/saffroy/ring/modules/membership/src/venv-dbg/lib/python3.4/site-packages/kazoo/handlers/threading.py", line 124, in _thread_worker
func = queue.get()
File "/usr/lib/python3.4/threading.py", line 868, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python3.4/threading.py", line 920, in _bootstrap_inner
self.run()
File "/usr/lib/python3.4/threading.py", line 888, in _bootstrap
self._bootstrap_inner()
Thread 1 (Thread 0x7fd5da634740 (LWP 5426)):
Traceback (most recent call first):
<built-in method send of socket object at remote 0x7fd5d724a178>
File "/home/saffroy/ring/modules/membership/src/venv-dbg/lib/python3.4/site-packages/kazoo/client.py", line 682, in stop
self._connection._write_sock.send(b'\0')
File "/home/saffroy/ring/modules/membership/src/venv-dbg/lib/python3.4/site-packages/kazoo/client.py", line 630, in start
self.stop()
File "./test_31889.py", line 42, in reconnect
File "./test_31889.py", line 56, in main
File "./test_31889.py", line 60, in <module>
While the program runs, I monitor the state of its socketpairs with ss:
watch -n1 "ss -xp|awk 'NR==1 || /dbg/'"
When the program blocks, ss output always ends up in this state:
Netid State Recv-Q Send-Q Local Address:Port Peer Address:Port
u_str ESTAB 0 213504 * 68192600 * 68192599 users:(("python3-dbg",5426,8))
u_str ESTAB 278 0 * 68192599 * 68192600 users:(("python3-dbg",5426,7))
Anything wrong in my program? Or is it a bug in Kazoo?
If I change my code to always call zk.close()
before zk.start()
then the problem disappears: the socketpairs don't get clogged with unread bytes, as they are closed and re-created on every connection attempt.
I think the bug is that, on a failure, KazooClient.start()
itself calls self.stop()
but not self.close()
. As we have multiple failed connection attemps, bytes accumulate in the ConnectionHandler
socketpair, which eventually is full.
Adding this call to close()
solves the problem for me, I'll do a PR.
BTW the problem is even easier to reproduce by trying to connecto to a non-existent ZK server (e.g. use localhost:12345
).
PR opened in #579