pyzmq icon indicating copy to clipboard operation
pyzmq copied to clipboard

Multiple PUB/SUB bind & connect differences

Open iSplasher opened this issue 2 years ago • 1 comments

Hello, I have an application where I need to publish messages from several processes, and then one or more SUBs that receive those messages. Naturally, I would use the PUB/SUB pattern but I'm encountering some issues with SUBs not being able to receive messages from one or any publishers. I based the code on the PUB/SUB example, but this snippet illustrates the issue. I was playing around with different configurations to see what would happen, but I discovered that

  1. if I bind from a PUB socket, then all SUBs are able to receive messages, but only from that single PUB.
  2. if i bind from a SUB then only that SUB is able to receive messages but in turn, it is able to receive messages from all PUBs

What I want is a combination where all SUBs are able to receive messages from all PUBs. I've already looked at the PULL/PUSH pattern, but I need the fan-out message distribution from PUB/SUB which i don't think PULL/PUSH have. I want to confirm if this is the intended behaviour or not.

"""
Simple example of using zmq log handlers

This starts a number of subprocesses with PUBHandlers that generate
log messages at a regular interval.  The main process has a SUB socket,
which aggregates and logs all of the messages to the root logger.
"""

import logging
from multiprocessing import Process, current_process
import os
import random
import sys
import time
from threading import Thread

import zmq
from zmq.log.handlers import PUBHandler

LOG_LEVELS = (
    logging.DEBUG,
    logging.INFO,
    logging.WARN,
    logging.ERROR,
    logging.CRITICAL,
)


def sub_logger(port, level=logging.DEBUG, bind=False, log=True, name=""):
    ctx = zmq.Context()
    sub = ctx.socket(zmq.SUB)
    if bind:
        sub.bind('tcp://127.0.0.1:%i' % port)
    else:
        sub.connect('tcp://127.0.0.1:%i' % port)
    sub.setsockopt(zmq.SUBSCRIBE, b"")
    logging.basicConfig(level=level)

    print(f"starting {name} at %i with level=%s" % (os.getpid(), level))

    while True:
        ev, message = sub.recv_multipart()
        if log:
            message = message.decode()
            print(f"{name:}: {message}")


def log_worker(port, interval=1, bind=False, name=""):
    ctx = zmq.Context()
    pub = ctx.socket(zmq.PUB)

    level = random.choice(LOG_LEVELS)

    if bind:
        pub.bind('tcp://127.0.0.1:%i' % port)
    else:
        pub.connect('tcp://127.0.0.1:%i' % port)


    print(f"starting {name} at %i with level=%s" % (os.getpid(), level))

    while True:
        level = random.choice(LOG_LEVELS)
        pub.send_multipart([b'log', f"logging {level} from {name}-{current_process().name}".encode()])
        time.sleep(interval)


if __name__ == '__main__':
    if len(sys.argv) > 1:
        n = int(sys.argv[1])
    else:
        n = 2

    if len(sys.argv) > 2:
        port = int(sys.argv[2])
    else:
        port = 5558

    # start the log watcher
    Process(target=log_worker, args=(port,), kwargs=dict(bind=False, name="worker 1"), daemon=True).start()
    Thread(target=log_worker, args=(port,), kwargs=dict(bind=False, name="worker 2"), daemon=True).start()
    Process(target=sub_logger, args=(port,), kwargs=dict(bind=False, name="sub 1"), daemon=True).start()
    Thread(target=sub_logger, args=(port,), kwargs=dict(name="sub 2"), daemon=True).start()

    t1 = Thread(target=log_worker, args=(port,), kwargs=dict(bind=True, name="worker 0"), daemon=True)
    t1.start()
    t2 = Thread(target=sub_logger, args=(port,), kwargs=dict(bind=False, name="sub 0"), daemon=True)
    t2.start()
    t2.join()

iSplasher avatar Nov 01 '21 23:11 iSplasher

For anyone that's interested in this pattern, I ended up implementing the following pattern but without ROUTER and DEALER. This way I republish all messages through a single PUB so all SUBs can get it. billede

iSplasher avatar Nov 02 '21 02:11 iSplasher

Very late, but I think XPUB and XSUB are what you needed?

davetapley avatar Feb 21 '23 19:02 davetapley