pyzmq icon indicating copy to clipboard operation
pyzmq copied to clipboard

Crash from green PUB socket on heavy workload

Open sublee opened this issue 8 years ago • 5 comments

I have a crash issue with green PUB socket. I reported the issue at https://github.com/zeromq/libzmq/issues/2252. I got an advice. He guessed I'm using zmq with multi-threading.

When I use zmq.green, do I need to always acquire a lock for every green zmq sockets? So I have to change all code like this:

socket = context.socket(zmq.SUB)
socket.connect('tcp://127.0.0.1:5932')
socket.set(zmq.SUBSCRIBE, my_topic)

To like this?

zmq_lock = gevent.lock.Semaphore()
with zmq_lock:
    socket = context.socket(zmq.SUB)
    socket.connect('tcp://127.0.0.1:5932')
    socket.set(zmq.SUBSCRIBE, my_topic)

I hope your help.

sublee avatar Dec 13 '16 13:12 sublee

closed by #951

minrk avatar Dec 14 '16 09:12 minrk

@minrk Let's talk on https://github.com/zeromq/libzmq/issues/2252.

sublee avatar Dec 14 '16 10:12 sublee

But please reopen this issue. I still have problems.

sublee avatar Dec 14 '16 10:12 sublee

My team has tried to reproduce this crash by only libzmq but we didn't succeed yet. Now I guess PyZMQ caused the crash rather than libzmq.

@minrk would you run the below code which reproduces the crash?

# -*- coding: utf-8 -*-
from collections import defaultdict
import random
import time

import gevent.monkey; gevent.monkey.patch_time()  # noqa
import zmq.green as zmq


# Number of SUB sockets at least 2.
n = 2
# Number of topics.
m = 1000


t = time.time()
c = zmq.Context()
p = c.socket(zmq.PUB)
p.set(zmq.RCVHWM, 1000)  # It doesn't matter to lead the crash.
topic = lambda x: ('%08x' % abs(hash(str(x))))[:8]


print '1. Spawn %d SUB sockets' % n
subs = []
ports = []
for x in range(n):
    s = c.socket(zmq.SUB)
    s.set(zmq.SNDHWM, 10)  # Lower high water mark leads the crash faster.
    for x in range(m):
        s.set(zmq.SUBSCRIBE, topic(x))
    port = s.bind_to_random_port('tcp://127.0.0.1')
    ports.append(port)
    subs.append(s)


print '2. Connect to SUB sockets'
for port in ports:
    p.connect('tcp://127.0.0.1:%d' % port)


print '3. Subscribing/Unsubscribing...'
topics = defaultdict(set)
while True:
    for s in subs:
        if topics[s] and random.random() < 0.5:
            t = random.choice(list(topics[s]))
            s.set(zmq.UNSUBSCRIBE, t)
        else:
            t = topic(random.randrange(m))
            s.set(zmq.SUBSCRIBE, t)
            topics[s].add(t)
    time.sleep(0)  # Only 0 seconds lead the crash.

sublee avatar Jan 09 '17 08:01 sublee

I can reproduce a crash in mtrie with that code. I can also trigger a more severe actual segfault even earlier by changing the sleep to a nonzero value:

thread #1: tid = 0x10e030, 0x000000010163b851 libzmq.cpython-36m-darwin.so`zmq::pipe_t::write(zmq::msg_t*) + 17, queue = 'com.apple.main-thread', stop reason = EXC_BAD_ACCESS (code=1, address=0x59)
    frame #0: 0x000000010163b851 libzmq.cpython-36m-darwin.so`zmq::pipe_t::write(zmq::msg_t*) + 17

The crash does have all the symptoms of thread-unsafe access of sockets, which I don't get, given my understanding of how gevent works. It seems like there must be multiple threads talking to the socket.

minrk avatar Jan 09 '17 11:01 minrk