pyzmq
                                
                                
                                
                                    pyzmq copied to clipboard
                            
                            
                            
                        Multiple PUB/SUB bind & connect differences
Hello, I have an application where I need to publish messages from several processes, and then one or more SUBs that receive those messages. Naturally, I would use the PUB/SUB pattern but I'm encountering some issues with SUBs not being able to receive messages from one or any publishers. I based the code on the PUB/SUB example, but this snippet illustrates the issue. I was playing around with different configurations to see what would happen, but I discovered that
- if I bind from a PUB socket, then all SUBs are able to receive messages, but only from that single PUB.
 - if i bind from a SUB then only that SUB is able to receive messages but in turn, it is able to receive messages from all PUBs
 
What I want is a combination where all SUBs are able to receive messages from all PUBs. I've already looked at the PULL/PUSH pattern, but I need the fan-out message distribution from PUB/SUB which i don't think PULL/PUSH have. I want to confirm if this is the intended behaviour or not.
"""
Simple example of using zmq log handlers
This starts a number of subprocesses with PUBHandlers that generate
log messages at a regular interval.  The main process has a SUB socket,
which aggregates and logs all of the messages to the root logger.
"""
import logging
from multiprocessing import Process, current_process
import os
import random
import sys
import time
from threading import Thread
import zmq
from zmq.log.handlers import PUBHandler
LOG_LEVELS = (
    logging.DEBUG,
    logging.INFO,
    logging.WARN,
    logging.ERROR,
    logging.CRITICAL,
)
def sub_logger(port, level=logging.DEBUG, bind=False, log=True, name=""):
    ctx = zmq.Context()
    sub = ctx.socket(zmq.SUB)
    if bind:
        sub.bind('tcp://127.0.0.1:%i' % port)
    else:
        sub.connect('tcp://127.0.0.1:%i' % port)
    sub.setsockopt(zmq.SUBSCRIBE, b"")
    logging.basicConfig(level=level)
    print(f"starting {name} at %i with level=%s" % (os.getpid(), level))
    while True:
        ev, message = sub.recv_multipart()
        if log:
            message = message.decode()
            print(f"{name:}: {message}")
def log_worker(port, interval=1, bind=False, name=""):
    ctx = zmq.Context()
    pub = ctx.socket(zmq.PUB)
    level = random.choice(LOG_LEVELS)
    if bind:
        pub.bind('tcp://127.0.0.1:%i' % port)
    else:
        pub.connect('tcp://127.0.0.1:%i' % port)
    print(f"starting {name} at %i with level=%s" % (os.getpid(), level))
    while True:
        level = random.choice(LOG_LEVELS)
        pub.send_multipart([b'log', f"logging {level} from {name}-{current_process().name}".encode()])
        time.sleep(interval)
if __name__ == '__main__':
    if len(sys.argv) > 1:
        n = int(sys.argv[1])
    else:
        n = 2
    if len(sys.argv) > 2:
        port = int(sys.argv[2])
    else:
        port = 5558
    # start the log watcher
    Process(target=log_worker, args=(port,), kwargs=dict(bind=False, name="worker 1"), daemon=True).start()
    Thread(target=log_worker, args=(port,), kwargs=dict(bind=False, name="worker 2"), daemon=True).start()
    Process(target=sub_logger, args=(port,), kwargs=dict(bind=False, name="sub 1"), daemon=True).start()
    Thread(target=sub_logger, args=(port,), kwargs=dict(name="sub 2"), daemon=True).start()
    t1 = Thread(target=log_worker, args=(port,), kwargs=dict(bind=True, name="worker 0"), daemon=True)
    t1.start()
    t2 = Thread(target=sub_logger, args=(port,), kwargs=dict(bind=False, name="sub 0"), daemon=True)
    t2.start()
    t2.join()
                                    
                                    
                                    
                                
For anyone that's interested in this pattern, I ended up implementing the following pattern but without ROUTER and DEALER. This way I republish all messages through a single PUB so all SUBs can get it.

Very late, but I think XPUB and XSUB are what you needed?