adaptive-scheduler
adaptive-scheduler copied to clipboard
optimize the communication loop
In the server, sending an object takes too long. When many (1000) clients connect, I don't want the program to slow down.
Blocked by https://github.com/cloudpipe/cloudpickle/issues/374.
The current implementation is:
client.py
import time
import zmq
ctx = zmq.Context()
def send_something(url: str, t_start) -> None:
with ctx.socket(zmq.REQ) as socket:
socket.connect(url)
socket.send_pyobj(t_start)
reply = socket.recv_pyobj()
print(reply - t_start, time.time() - t_start)
url = "tcp://192.168.1.13:5555"
send_something(url, time.time())
server.py
import asyncio
import time
import zmq
import zmq.asyncio
ctx = zmq.asyncio.Context()
url = "tcp://*:5555"
def _dispatch(request):
time.sleep(5)
reply = time.time()
return reply
async def server(url) -> None:
sock = ctx.socket(zmq.REP)
sock.bind(url)
try:
while True:
request = await sock.recv_pyobj()
reply = _dispatch(request)
await sock.send_pyobj(reply)
print(f"time {reply - request}")
finally:
sock.close()
ioloop = asyncio.get_event_loop()
proc = ioloop.run_until_complete(server(url))
It appears like the REQ REP protocol isn't suitable for what I want.
Instead I could replace server.py by (from here)
import time
import threading
import zmq
def _dispatch(request):
time.sleep(5)
reply = time.time()
return reply
def worker_routine(worker_url, context=None):
"""Worker routine"""
context = context or zmq.Context.instance()
socket = context.socket(zmq.REP)
socket.connect(worker_url)
try:
while True:
request = socket.recv_pyobj()
reply = _dispatch(request)
socket.send_pyobj(reply)
print(f"time {reply - request}")
finally:
socket.close()
def main():
"""Server routine"""
url_worker = "inproc://workers"
url_client = "tcp://*:5555"
# Prepare our context and sockets
context = zmq.Context.instance()
# Socket to talk to clients
clients = context.socket(zmq.ROUTER)
clients.bind(url_client)
# Socket to talk to workers
workers = context.socket(zmq.DEALER)
workers.bind(url_worker)
# Launch pool of worker threads
for i in range(5):
thread = threading.Thread(target=worker_routine, args=(url_worker,))
thread.start()
zmq.proxy(clients, workers)
# We never get here but clean up anyhow
clients.close()
workers.close()
context.term()
if __name__ == "__main__":
main()