pyzmq
pyzmq copied to clipboard
Multiple PUB/SUB bind & connect differences
Hello, I have an application where I need to publish messages from several processes, and then one or more SUBs that receive those messages. Naturally, I would use the PUB/SUB pattern but I'm encountering some issues with SUBs not being able to receive messages from one or any publishers. I based the code on the PUB/SUB example, but this snippet illustrates the issue. I was playing around with different configurations to see what would happen, but I discovered that
- if I bind from a PUB socket, then all SUBs are able to receive messages, but only from that single PUB.
- if i bind from a SUB then only that SUB is able to receive messages but in turn, it is able to receive messages from all PUBs
What I want is a combination where all SUBs are able to receive messages from all PUBs. I've already looked at the PULL/PUSH pattern, but I need the fan-out message distribution from PUB/SUB which i don't think PULL/PUSH have. I want to confirm if this is the intended behaviour or not.
"""
Simple example of using zmq log handlers
This starts a number of subprocesses with PUBHandlers that generate
log messages at a regular interval. The main process has a SUB socket,
which aggregates and logs all of the messages to the root logger.
"""
import logging
from multiprocessing import Process, current_process
import os
import random
import sys
import time
from threading import Thread
import zmq
from zmq.log.handlers import PUBHandler
LOG_LEVELS = (
logging.DEBUG,
logging.INFO,
logging.WARN,
logging.ERROR,
logging.CRITICAL,
)
def sub_logger(port, level=logging.DEBUG, bind=False, log=True, name=""):
ctx = zmq.Context()
sub = ctx.socket(zmq.SUB)
if bind:
sub.bind('tcp://127.0.0.1:%i' % port)
else:
sub.connect('tcp://127.0.0.1:%i' % port)
sub.setsockopt(zmq.SUBSCRIBE, b"")
logging.basicConfig(level=level)
print(f"starting {name} at %i with level=%s" % (os.getpid(), level))
while True:
ev, message = sub.recv_multipart()
if log:
message = message.decode()
print(f"{name:}: {message}")
def log_worker(port, interval=1, bind=False, name=""):
ctx = zmq.Context()
pub = ctx.socket(zmq.PUB)
level = random.choice(LOG_LEVELS)
if bind:
pub.bind('tcp://127.0.0.1:%i' % port)
else:
pub.connect('tcp://127.0.0.1:%i' % port)
print(f"starting {name} at %i with level=%s" % (os.getpid(), level))
while True:
level = random.choice(LOG_LEVELS)
pub.send_multipart([b'log', f"logging {level} from {name}-{current_process().name}".encode()])
time.sleep(interval)
if __name__ == '__main__':
if len(sys.argv) > 1:
n = int(sys.argv[1])
else:
n = 2
if len(sys.argv) > 2:
port = int(sys.argv[2])
else:
port = 5558
# start the log watcher
Process(target=log_worker, args=(port,), kwargs=dict(bind=False, name="worker 1"), daemon=True).start()
Thread(target=log_worker, args=(port,), kwargs=dict(bind=False, name="worker 2"), daemon=True).start()
Process(target=sub_logger, args=(port,), kwargs=dict(bind=False, name="sub 1"), daemon=True).start()
Thread(target=sub_logger, args=(port,), kwargs=dict(name="sub 2"), daemon=True).start()
t1 = Thread(target=log_worker, args=(port,), kwargs=dict(bind=True, name="worker 0"), daemon=True)
t1.start()
t2 = Thread(target=sub_logger, args=(port,), kwargs=dict(bind=False, name="sub 0"), daemon=True)
t2.start()
t2.join()
For anyone that's interested in this pattern, I ended up implementing the following pattern but without ROUTER and DEALER. This way I republish all messages through a single PUB so all SUBs can get it.
Very late, but I think XPUB
and XSUB
are what you needed?