libzmq icon indicating copy to clipboard operation
libzmq copied to clipboard

Delay required after PUB bind before messages can be sent

Open JC3 opened this issue 1 year ago • 1 comments

libzmq: 4.3.5 via PyZMQ 26.2.0 (Python 3.10) OS: macOS 13.6.6, Ubuntu 22.04

It appears that a delay is needed after creating and binding a PUB socket before the first message can be successfully published. In testing, a 200ms delay seems to be the minimum required delay. There seems to be no difference between various protocols (tried TCP and IPC). The following graph shows the send_multipart success rate vs. the delay time after bind for various delays (30 trials each).

Screenshot 2024-09-30 at 11 24 08 AM

What is going on here? Is there a more deterministic way to check if the PUB socket is ready to send rather than adding arbitrary delays?

Test Code (Python)

import zmq
import time
import multiprocessing as mp
import struct

TEST_DELAYS = range(0,251,10)
TEST_TRIALS = 30
ADDRESS = "ipc:///tmp/delay_test.ipc" # "tcp://127.0.0.1:43212"


def _sub_main (keep_running):

    ctx = zmq.Context()
    
    sub = ctx.socket(zmq.SUB)
    sub.connect(ADDRESS)
    sub.setsockopt(zmq.SUBSCRIBE, b"")
    sub.setsockopt(zmq.RCVTIMEO, 500)

    results = {}
    
    while keep_running.value:
        try:
            (delay,) = struct.unpack("<i", sub.recv_multipart()[0])
        except zmq.Again:
            continue
        results[delay] = results.get(delay, 0) + 1
        print((delay, results[delay]))

    sub.close()
    ctx.destroy()

    print("---------")
    for delay_ms, success_rate in results.items():
        print(f"{delay_ms},{success_rate/TEST_TRIALS:.3f}")


def run_test (delay):

    ctx = zmq.Context()
    pub = ctx.socket(zmq.PUB)
    pub.bind(ADDRESS)
    if delay > 0:
        time.sleep(delay/1000.0)
    pub.send_multipart([struct.pack("<i", delay)])
    pub.close()
    ctx.destroy()        

def main ():

    keep_running = mp.Value('i', 1)
    receiver = mp.Process(target=_sub_main, args=(keep_running,))
    receiver.start()

    for delay in TEST_DELAYS:
        for k in range(TEST_TRIALS):
            run_test(delay)

    keep_running.value = 0
    receiver.join()

    
if __name__ == "__main__":
    main()

JC3 avatar Sep 30 '24 15:09 JC3

that's correct and documented behavior.

A sub socket only sees messages after it has successfully connected to a pub socket. To synchronize state you'd have to also create another type of socket which tells you at which state the sub socket is currently, aka you'd have to create some form of bookkeeping for your messages in some form of stable storage.

the "client" can then tell the "server" which messages it has received already and then the server can send whats missing.

Or you might go and use the zmq_socket_monitor to tell you when the sub socket has connected and then let the pub socket start sending messages.

Asmod4n avatar Dec 01 '24 11:12 Asmod4n