First message lost between pub/sub sockets
Hey everyone, Sorry for creating another issue about this. But my question slightly differs from the other problems I found about this...
I have a middleware that uses zeromq as the communication layer, and in some specific cases, I am interested in receiving all the messages. To achieve this I used the monitor socket, and I only publish the first message when there are N (a magic number that for the cases that I require this, I know its value) connections established. However, it appears that sometimes, the first message is lost anyway... I isolated the core logic for this in the following minimal example (which when running locally does not replicate the issue, only when executing on a distributed set of machines):
import zmq
import time
import multiprocessing
from zmq import Flag, PollEvent, Event
from zmq.utils.monitor import recv_monitor_message
def pub_process():
# Create a context and a PUB socket
context = zmq.Context()
pub_socket = context.socket(zmq.PUB)
pub_socket.bind("tcp://*:5555")
# Create monitor socket
monitor_socket = pub_socket.get_monitor_socket()
print("Waiting for subscriber to connect...")
# Wait for the subscriber to connect
while True:
result = monitor_socket.poll(10)
if result != PollEvent.POLLIN:
continue
event = recv_monitor_message(monitor_socket, Flag.DONTWAIT)
if event["event"] == Event.ACCEPTED:
print("Subscriber connected!")
monitor_socket.close()
break
# Now send messages after the subscriber is connected
for i in range(5):
message = f"Message {i}"
print(f"Publishing: {message}")
pub_socket.send_string(message)
time.sleep(1)
# Send a stop message to the subscriber, then close the socket
pub_socket.send_string("STOP")
pub_socket.close()
context.term()
print("Publisher process finished")
def sub_process():
# Create a context and a SUB socket
context = zmq.Context()
sub_socket = context.socket(zmq.SUB)
sub_socket.setsockopt_string(zmq.SUBSCRIBE, "")
sub_socket.connect("tcp://localhost:5555")
# Poller setup to check for messages
poller = zmq.Poller()
poller.register(sub_socket, zmq.POLLIN)
print("Waiting for messages...")
# Wait for messages
while True:
socks = dict(poller.poll())
if sub_socket in socks:
message = sub_socket.recv_string()
if message == "STOP":
break
print(f"Received: {message}")
# Close the socket and terminate the context
sub_socket.close()
context.term()
print("Subscriber process finished")
def main():
pub_proc = multiprocessing.Process(target=pub_process, daemon=True)
sub_proc = multiprocessing.Process(target=sub_process, daemon=True)
pub_proc.start()
sub_proc.start()
pub_proc.join()
sub_proc.join()
if __name__ == "__main__":
main()
This issue https://github.com/zeromq/libzmq/issues/4746 suggests using the monitor sockets, which I am doing. This issue https://github.com/zeromq/libzmq/issues/2267 suggests that the problem exists when the PUB connects to the SUB, which I am not doing. According to the documentation here https://zguide.zeromq.org/docs/chapter1/#Getting-the-Message-Out, if I understand it correctly, this could happen if the PUB is already sending messages out when the SUB connects, which is not the case because the PUB only publishes the message when the connections are established.
So, does the monitor socket guarantee that the SUB is going to receive the message, or do I need to implement logic with another socket to guarantee that the PUB/SUB connection is fully established?
Best, Jorge