jeromq
jeromq copied to clipboard
ROUTER-DEALER Pattern not receiving messages
Using router-dealer pattern to send messages sometimes doesn't work. The dealer sends messages every 3 seconds, and logs when a message is queued. The dealer logs that it has queued the message, but the router doesn't receive anything.
I have attached below both the Router and Dealer in plain Java code which are built independently as jars and run. Java version 11 is used for building and running the code, JeroMQ version 0.5.2.
This is the Router class :
public class Router {
private static final String ROUTER_HOST = FetchProperties.getInstance().getProperty("ROUTER_HOST");
private static final String ROUTER_PORT = FetchProperties.getInstance().getProperty("ROUTER_PORT");
private static final String BIND_ADDRESS = String.format("tcp://%s:%s", ROUTER_HOST, ROUTER_PORT);
private final ZContext context;
private final ZMQ.Socket socket;
public Router() {
context = new ZContext();
socket = context.createSocket(SocketType.ROUTER);
socket.setReceiveTimeOut(-1);
socket.setSendTimeOut(0);
socket.setLinger(2000);
socket.setHWM(0);
socket.setIdentity("Router".getBytes());
socket.bind(BIND_ADDRESS);
}
public void start() {
while(!Thread.currentThread().isInterrupted()) {
var clientBytes = socket.recv();
var messageBytes = socket.recv();
if (Objects.isNull(clientBytes) || Objects.isNull(messageBytes)) {
System.out.println("Router received null, proceeding..");
continue;
}
var clientId = new String(clientBytes, ZMQ.CHARSET);
var message = new String(messageBytes, ZMQ.CHARSET);
System.out.println("Router received: clientId " + clientId + " message " + message);
socket.send(clientId, ZMQ.SNDMORE);
socket.send("message from router");
}
}
public static void main(String[] args) {
var router = new Router();
new Thread(router::start).start();
}
}
This is the dealer class :
public class Dealer {
private static final String DEALER_HOST = FetchProperties.getInstance().getProperty("DEALER_HOST");
private static final String DEALER_PORT = FetchProperties.getInstance().getProperty("DEALER_PORT");
private static final String BIND_ADDRESS = String.format("tcp://%s:%s", DEALER_HOST, DEALER_PORT);
private final ZContext context;
private final ZMQ.Socket socket;
public Dealer() {
context = new ZContext();
socket = context.createSocket(SocketType.DEALER);
socket.setReceiveTimeOut(-1);
socket.setSendTimeOut(0);
socket.setLinger(2000);
socket.setHWM(0);
socket.setIdentity("Dealer".getBytes());
socket.connect(BIND_ADDRESS);
}
public void start() {
while(!Thread.currentThread().isInterrupted()) {
var clientBytes = socket.recv();
var messageBytes = socket.recv();
if (Objects.isNull(clientBytes) || Objects.isNull(messageBytes)) {
System.out.println("Dealer received null, proceeding..");
continue;
}
var clientId = new String(clientBytes, ZMQ.CHARSET);
var message = new String(messageBytes, ZMQ.CHARSET);
System.out.println("Dealer: clientId {" + clientId + "} message {" + message + "}");
}
}
public void sendMessage(String message) {
socket.send("Router", ZMQ.SNDMORE);
socket.send((byte[]) null, ZMQ.SNDMORE);
var sent = socket.send(message);
System.out.println(sent ? "message queued" : "message NOT queued");
}
public static void main(String[] args) throws InterruptedException {
var dealer = new Dealer();
new Thread(dealer::start).start();
while(true) {
System.out.println("dealer sending message");
TimeUnit.MILLISECONDS.sleep(3000);
dealer.sendMessage("message from dealer");
}
}
}
hi,
First noticeable thing that I see in your code is that sending and receiving for the DEALER socket are performed in different threads. Jeromq does not handle that well, sockets are single-threaded, you should refactor your code to perform all operations (sending, receiving, closing) in one thread only.