jeromq
jeromq copied to clipboard
proxying ZeroMQ and HTTP sockets behind a common port (ZMTP v3.1)
First, thanks for the great library -- much appreciated and keep up the good work :+1: !
I am exploring options am trying to work out whether it is possible to proxy/share several ZeroMQ and non-ZeroMQ sockets behind one common ROUTER socket, notably XPUB and STREAM. The idea is to export only one public port for the router and depending on the requested service (ie. whether it is framed within a ZeroMQ frame or not) to forward the request either to one native ZeroMQ socket (standard proxying) or another external port if it's an HTTP request (ie. if there is no other known ZeroMQ framing). The HTTP port is known/internal to the ROUTER but not known by the users.
The older C-based examples, translated to JeroMQ (see below), seem to be able to mimic a basic HTTP server/reply -- however -- I do seem to find the access to the initial full HTTP request header that initiated the handler pair. This would be needed to be able to forward the full request (including URI) to another proper HTTP server implementation (worker).
Am I missing something? Is there a simple MVP example for this or is this a tall order?
Any help or hint would be much appreciated! Thanks in advance!
EDIT 1: got some more insight through the following discussion. Notably, the actual initial get request header is stored after the initial empty frame, ie.
1. Get [id, ] message on client connection.
2. receive empty payload. message.
3. Get [id, playload] message (N.B. missing in the original example/documentation)
4. receive playload message. (N.B. missing in the original example/documentation)
For a working example establishing a simple HTTP-based server on "TCP://*:8081)" see below. Hope this is useful for somebody else who is stumbling over this. There is still the issue w.r.t. forwarding the request on the ROUTER to STREAM socket - any help would be appreciated:
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.*;
public class HttpTest {
public static void main(final String[] argv) {
try (ZContext context = new ZContext()) {
Socket router = context.createSocket(SocketType.ROUTER);
Socket stream = context.createSocket(SocketType.STREAM);
if (!router.bind("tcp://*:8080")) {
throw new IllegalStateException("could not bind socket");
}
if (!stream.bind("tcp://*:8081")) {
throw new IllegalStateException("could not bind socket");
}
Poller poller = context.createPoller(2);
poller.register(router, Poller.POLLIN);
poller.register(stream, Poller.POLLIN);
while (!Thread.interrupted()) {
if (poller.poll(1000) == -1) {
break; // Interrupted
}
if (poller.pollin(0)) {
handleRouterSocket(router, stream);
}
if (poller.pollin(1)) {
handleStreamHttpSocket(stream);
}
}
}
}
private static long bytesToLong(byte[] bytes) {
long value = 0;
for (int i = 0; i < bytes.length; i++) {
value = (value << 8) + (bytes[i] & 0xff);
}
return value;
}
private static void handleRouterSocket(final Socket router, final Socket stream) {
// received router request
while (true) {
// receive message
final byte[] message = router.recv(0);
final boolean more = router.hasReceiveMore();
// Broker it -- throws an exception (too naive implementation?)
stream.send(message, more ? ZMQ.SNDMORE : 0);
if (!more) {
break;
}
}
}
private static void handleStreamHttpSocket(Socket httpSocket) {
// Get [id, ] message on client connection.
final ZFrame handle = ZFrame.recvFrame(httpSocket);
if (handle == null || bytesToLong(handle.getData()) == 0) {
return;
}
System.err.println("received ID = " + handle.toString() + " - more = " + handle.hasMore()); // Professional Logging(TM)
if (!handle.hasMore()) {
// Close erroneous connection to browser
handle.send(httpSocket, ZFrame.MORE | ZFrame.REUSE);
httpSocket.send((byte[])null, 0);
return;
}
// receive empty payload.
final ZFrame emptyFrame = ZFrame.recvFrame(httpSocket);
if (emptyFrame.hasMore() || emptyFrame.size() != 0) {
// Close erroneous connection to browser
handle.send(httpSocket, ZFrame.MORE | ZFrame.REUSE);
httpSocket.send((byte[])null, 0);
return;
}
// Get [id, playload] message.
final ZFrame clientRequest = ZFrame.recvFrame(httpSocket);
if (clientRequest == null || bytesToLong(clientRequest.getData()) == 0) {
return;
}
if (!clientRequest.hasMore()) {
// Close erroneous connection to browser
clientRequest.send(httpSocket, ZFrame.MORE | ZFrame.REUSE);
httpSocket.send((byte[])null, 0);
return;
}
// receive playload message.
ZFrame request = ZFrame.recvFrame(httpSocket);
String header = new String(request.getData(), 0, request.size(),
StandardCharsets.UTF_8);
System.err.println("received client request header : '" + header)); // Professional Logging(TM)
// Send Hello World response
final String URI = (header.length() == 0) ? "null" : header.split("\n")[0];
clientRequest.send(httpSocket, ZFrame.MORE | ZFrame.REUSE);
httpSocket.send("HTTP/1.0 200 OK\r\nContent-Type: text/plain\r\n\r\nHello, World!\nyou requested URI: " + URI);
// Close connection to browser -- normally exit
clientRequest.send(httpSocket, ZFrame.MORE | ZFrame.REUSE);
httpSocket.send((byte[])null, 0);
}
}
The original (incomplete) samples based on the documentation:
Click to expand!
import java.util.Arrays;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZFrame;
import org.zeromq.ZMQ.*;
public class HttpTests2 {
public static void main(final String[] argv) {
try (ZContext context = new ZContext()) {
Socket stream = context.createSocket(SocketType.STREAM);
if (!stream.bind("tcp://*:8080")) {
throw new IllegalStateException("could not bind socket");
}
while (true) {
ZFrame handle = ZFrame.recvFrame(stream);
if (handle == null) {
break;
}
System.err.println( // Professional Logging(TM)
"received handle = " + handle.toString() + " " + handle.hasMore() +
" data = " + Arrays.toString(handle.getData()) + " - identity: " +
Arrays.toString(stream.getIdentity())); // Professional Logging(TM)
ZFrame request = handle;
while (request.hasMore()) {
request = ZFrame.recvFrame(stream);
System.err.println("received part message : '" +
(request == null ? "null" : request) +
"' more?: " + request.hasMore());
}
// Send Hello World response
handle.send(stream, ZFrame.MORE | ZFrame.REUSE);
stream.send(
"HTTP/1.0 200 OK\r\nContent-Type: text/plain\r\n\r\nHello, World!");
// Close connection to browser
handle.send(stream, ZFrame.MORE | ZFrame.REUSE);
stream.send((byte[])null, 0);
System.err.println("sent message to handle: " +
handle.toString()); // Professional Logging(TM)
}
}
}
}
Similarly for the apparently now deprecated ROUTER+'setRouterRaw' solution:
import java.util.Arrays;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.*;
public class HttpTests {
public static void main(final String[] argv) {
try (ZContext context = new ZContext()) {
Socket router = context.createSocket(SocketType.ROUTER);
router.setRouterRaw(true);
if (!router.bind("tcp://*:8080")) {
throw new IllegalStateException("could not bind socket");
}
while (true) {
// Get HTTP request
ZFrame handle = ZFrame.recvFrame(router);
if (handle == null) {
break; // Ctrl-C interrupt
}
// final String request = router.recvStr();
ZFrame request = ZFrame.recvFrame(router);
System.err.println(
"received request = " + request.toString() + " handle = " +
handle.toString() + " " + handle.hasMore() + " data = " +
Arrays.toString(handle.getData())); // Professional Logging(TM)
// Send Hello World response
handle.send(router, ZFrame.MORE | ZFrame.REUSE);
router.send(
"HTTP/1.0 200 OK\r\nContent-Type: text/plain\r\n\r\nHello, World!");
// Close connection to browser
handle.send(router, ZFrame.MORE | ZFrame.REUSE);
router.send((byte[])null, 0);
// zmq_send(router, NULL, 0, 0);
System.err.println("sent message"); // Professional Logging(TM)
}
}
}
}
The first example yields some data in the browser but a null identity or data frame:
received handle = 00C8963A2F true data = [0, -56, -106, 58, 47] - identity: []
received part message : '' more?: false
sent message to handle: 00C8963A2F
received handle = 00C8963A30 true data = [0, -56, -106, 58, 48] - identity: []
received part message : '' more?: false
sent message to handle: 00C8963A30
I found a partial answer in the draft version 3.1 of the 37/ZMTP specification:
[..] We want to allow multiple tasks to share a single external unique interface and port, to decrease system administration costs. [..]
Thanks also again to @cite-reader for clarifying this on the (zeromq/general](https://gitter.im/zeromq/general) gitter that there is no easy shortcut around implementing a TCP proxy (and/or waiting for a ZMTP v 3.1 reference implementation may I add).
The open question is, whether and when may be implemented... :thinking:
A note: the resource property, if implemented, wouldn't be a solution for you. As a feature of ZMTP and exchanged in the ZMTP handshake, it only allows ZMQ resources to share a port. There's no proposed mechanism to allow HTTP and ZMQ resources to coexist that way.