pyzmq
pyzmq copied to clipboard
Crash from green PUB socket on heavy workload
I have a crash issue with green PUB socket. I reported the issue at https://github.com/zeromq/libzmq/issues/2252. I got an advice. He guessed I'm using zmq with multi-threading.
When I use zmq.green, do I need to always acquire a lock for every green zmq sockets? So I have to change all code like this:
socket = context.socket(zmq.SUB)
socket.connect('tcp://127.0.0.1:5932')
socket.set(zmq.SUBSCRIBE, my_topic)
To like this?
zmq_lock = gevent.lock.Semaphore()
with zmq_lock:
socket = context.socket(zmq.SUB)
socket.connect('tcp://127.0.0.1:5932')
socket.set(zmq.SUBSCRIBE, my_topic)
I hope your help.
closed by #951
@minrk Let's talk on https://github.com/zeromq/libzmq/issues/2252.
But please reopen this issue. I still have problems.
My team has tried to reproduce this crash by only libzmq but we didn't succeed yet. Now I guess PyZMQ caused the crash rather than libzmq.
@minrk would you run the below code which reproduces the crash?
# -*- coding: utf-8 -*-
from collections import defaultdict
import random
import time
import gevent.monkey; gevent.monkey.patch_time() # noqa
import zmq.green as zmq
# Number of SUB sockets at least 2.
n = 2
# Number of topics.
m = 1000
t = time.time()
c = zmq.Context()
p = c.socket(zmq.PUB)
p.set(zmq.RCVHWM, 1000) # It doesn't matter to lead the crash.
topic = lambda x: ('%08x' % abs(hash(str(x))))[:8]
print '1. Spawn %d SUB sockets' % n
subs = []
ports = []
for x in range(n):
s = c.socket(zmq.SUB)
s.set(zmq.SNDHWM, 10) # Lower high water mark leads the crash faster.
for x in range(m):
s.set(zmq.SUBSCRIBE, topic(x))
port = s.bind_to_random_port('tcp://127.0.0.1')
ports.append(port)
subs.append(s)
print '2. Connect to SUB sockets'
for port in ports:
p.connect('tcp://127.0.0.1:%d' % port)
print '3. Subscribing/Unsubscribing...'
topics = defaultdict(set)
while True:
for s in subs:
if topics[s] and random.random() < 0.5:
t = random.choice(list(topics[s]))
s.set(zmq.UNSUBSCRIBE, t)
else:
t = topic(random.randrange(m))
s.set(zmq.SUBSCRIBE, t)
topics[s].add(t)
time.sleep(0) # Only 0 seconds lead the crash.
I can reproduce a crash in mtrie with that code. I can also trigger a more severe actual segfault even earlier by changing the sleep to a nonzero value:
thread #1: tid = 0x10e030, 0x000000010163b851 libzmq.cpython-36m-darwin.so`zmq::pipe_t::write(zmq::msg_t*) + 17, queue = 'com.apple.main-thread', stop reason = EXC_BAD_ACCESS (code=1, address=0x59)
frame #0: 0x000000010163b851 libzmq.cpython-36m-darwin.so`zmq::pipe_t::write(zmq::msg_t*) + 17
The crash does have all the symptoms of thread-unsafe access of sockets, which I don't get, given my understanding of how gevent works. It seems like there must be multiple threads talking to the socket.