adaptive-scheduler icon indicating copy to clipboard operation
adaptive-scheduler copied to clipboard

optimize the communication loop

Open basnijholt opened this issue 5 years ago • 1 comments

In the server, sending an object takes too long. When many (1000) clients connect, I don't want the program to slow down.

Blocked by https://github.com/cloudpipe/cloudpickle/issues/374.

The current implementation is: client.py

import time

import zmq

ctx = zmq.Context()


def send_something(url: str, t_start) -> None:
    with ctx.socket(zmq.REQ) as socket:
        socket.connect(url)
        socket.send_pyobj(t_start)
        reply = socket.recv_pyobj()
        print(reply - t_start, time.time() - t_start)

url = "tcp://192.168.1.13:5555"
send_something(url, time.time())

server.py

import asyncio
import time

import zmq
import zmq.asyncio

ctx = zmq.asyncio.Context()

url = "tcp://*:5555"


def _dispatch(request):
    time.sleep(5)
    reply = time.time()
    return reply


async def server(url) -> None:
    sock = ctx.socket(zmq.REP)
    sock.bind(url)
    try:
        while True:
            request = await sock.recv_pyobj()
            reply = _dispatch(request)
            await sock.send_pyobj(reply)
            print(f"time {reply - request}")
    finally:
        sock.close()


ioloop = asyncio.get_event_loop()
proc = ioloop.run_until_complete(server(url))

basnijholt avatar May 26 '20 11:05 basnijholt

It appears like the REQ REP protocol isn't suitable for what I want.

Instead I could replace server.py by (from here)

import time
import threading
import zmq


def _dispatch(request):
    time.sleep(5)
    reply = time.time()
    return reply


def worker_routine(worker_url, context=None):
    """Worker routine"""
    context = context or zmq.Context.instance()
    socket = context.socket(zmq.REP)
    socket.connect(worker_url)
    try:
        while True:
            request = socket.recv_pyobj()
            reply = _dispatch(request)
            socket.send_pyobj(reply)
            print(f"time {reply - request}")
    finally:
        socket.close()


def main():
    """Server routine"""

    url_worker = "inproc://workers"
    url_client = "tcp://*:5555"

    # Prepare our context and sockets
    context = zmq.Context.instance()

    # Socket to talk to clients
    clients = context.socket(zmq.ROUTER)
    clients.bind(url_client)

    # Socket to talk to workers
    workers = context.socket(zmq.DEALER)
    workers.bind(url_worker)

    # Launch pool of worker threads
    for i in range(5):
        thread = threading.Thread(target=worker_routine, args=(url_worker,))
        thread.start()

    zmq.proxy(clients, workers)

    # We never get here but clean up anyhow
    clients.close()
    workers.close()
    context.term()


if __name__ == "__main__":
    main()

basnijholt avatar May 26 '20 12:05 basnijholt