jeromq icon indicating copy to clipboard operation
jeromq copied to clipboard

Slow receive (several milliseconds) for localhost communication

Open sdoeringNew opened this issue 5 years ago • 17 comments

In most cases sending a small message from a dealer and receiving it on the router takes about 0 to 1ms. That's what I expected for TCP communication on localhost and that's what I got in tests.

However in rare cases - perhaps 10 out of thousand - sending and receiving the message takes several milliseconds.

I wrote a JUnit Test to measure the latency. Two dealers sending a message to one of two routers. SlowReceiveTest.java.zip

Request 2, port 5555: Extraordinary latency: 5ms
Request 63, port 5555: Extraordinary latency: 9ms
Request 63, port 5556: Extraordinary latency: 9ms
Request 545, port 5556: Extraordinary latency: 12ms
Request 545, port 5555: Extraordinary latency: 13ms
Request 567, port 5555: Extraordinary latency: 8ms
Request 567, port 5556: Extraordinary latency: 8ms
Request 568, port 5556: Extraordinary latency: 8ms
Request 569, port 5556: Extraordinary latency: 7ms
Request 570, port 5556: Extraordinary latency: 8ms
Request 860, port 5556: Extraordinary latency: 7ms
Request 883, port 5556: Extraordinary latency: 7ms
Request 883, port 5555: Extraordinary latency: 7ms
Request 911, port 5556: Extraordinary latency: 10ms
Request 911, port 5555: Extraordinary latency: 11ms
Request 916, port 5555: Extraordinary latency: 5ms

Even with only one dealer and router the latency is sometimes high.

Why? Is this a bug?

The test system is a CentOS 7.

sdoeringNew avatar Apr 23 '19 13:04 sdoeringNew

On further investigation the Java garbage collection is not the cause for the high latency.

sdoeringNew avatar Apr 23 '19 13:04 sdoeringNew

Did you try with Epsilon GC (https://openjdk.java.net/jeps/318), test with jmh (https://openjdk.java.net/projects/code-tools/jmh/) for better measurement, disable hotspot compiling ? The println might add latency. The System.currentTimeMillis() should be done right before the send. I wonder what is the cost of ByteBuffer.allocate(4)....array() done after getting the send time.

fbacchella avatar Apr 23 '19 15:04 fbacchella

It is not the GC. So any other GC algorithm will not improve the latency. In fact there are two garbage collections during the whole test run. On start and on the end of the test.

The latency is calculated before the println.

The cost of allocating and writing eight bytes? Is this a real question?

I'll optimize the code that the time is taken right before the send. I highly doubt that the saved nanosecond will change anything on the test result.

I'll retry with jmh.

sdoeringNew avatar Apr 23 '19 15:04 sdoeringNew

It's difficult to reproduce on my machine, I need to reduce the sleep between calls drastically (10ms). I tried to change the transport to inproc:// and I still observe the phenomenon from time to time (2-3 times every 10 000 cycles). That does not rule out the stream engine, but there could be some friction in the pipes (or somewhere else like the signaler, too early to tell). @sdoeringNew Could you try on your side to confirm or infirm?

fredoboulo avatar Apr 23 '19 18:04 fredoboulo

@sdoeringNew It could be a number of things but first, sockets aren't threadsafe and should be bound to the same thread unless you have a full memory barrier when switching threads. In your example, there is no guarantee that is the case. There will be contention on the counter but it's unlikely the cause of the the slowdown. I would simplify the test to only have one producer and consumer. Eliminating what is unnecessary until you find the culprit. Be wary of coordinated omission. Microbenchmarks are hard.

trevorbernard avatar Apr 23 '19 20:04 trevorbernard

The println is inside the loop, so once you get one slow packet, the reception of the next one might be delayed too. EpsilonGC is a no-op GC, so no time at all is spend on it. It's not the cost of allocating height, it's the behind your back bookeping of ByteBuffer that might be a problem.

fbacchella avatar Apr 23 '19 20:04 fbacchella

I still see something here:

    private static final long THRESHOLD = TimeUnit.MICROSECONDS.toNanos(500);
    private static final long MAX       = 10_000_000;

    @Test
    public void testSporadicLatency()
    {
        zmq.Ctx ctx = zmq.ZMQ.createContext();
        zmq.SocketBase client = zmq.ZMQ.socket(ctx, zmq.ZMQ.ZMQ_DEALER);
        zmq.SocketBase server = zmq.ZMQ.socket(ctx, zmq.ZMQ.ZMQ_ROUTER);

        String endpoint = "inproc://test";
        //        String endpoint = "tcp://localhost:5555";
        zmq.ZMQ.connect(client, endpoint);
        zmq.ZMQ.bind(server, endpoint);
        long counter = 0;
        long sendTotal = 0;
        long recvTotal = 0;

        zmq.Msg msg = new zmq.Msg(4);
        // warmup
        for (int i = 1; i <= MAX; ++i) {
            zmq.ZMQ.send(client, msg, 0);
            zmq.ZMQ.recv(server, 0);
            zmq.ZMQ.recv(server, 0);
        }
        System.out.println("Start test");
        for (int i = 1; i <= MAX; ++i) {
            counter++;

            long start = System.nanoTime();
            zmq.ZMQ.send(client, msg, 0);
            long end = System.nanoTime();

            long latency = end - start;
            if (latency > THRESHOLD) {
                System.out.println(
                                   "Send " + counter + ": Extraordinary latency: "
                                           + TimeUnit.NANOSECONDS.toMicros(latency) + " micros");
            }
            sendTotal += latency;

            start = System.nanoTime();
            zmq.ZMQ.recv(server, 0);
            zmq.ZMQ.recv(server, 0);
            end = System.nanoTime();

            latency = end - start;
            if (latency > THRESHOLD) {
                System.out.println(
                                   "Recv " + counter + ": Extraordinary latency: "
                                           + TimeUnit.NANOSECONDS.toMicros(latency) + " micros");
            }
            recvTotal += latency;
        }

        System.out.println("Avg Send: " + (sendTotal / (MAX * 1000d)) + " micros");
        System.out.println("Avg Recv: " + (recvTotal / (MAX * 1000d)) + " micros");

        System.out.println("End test");

        zmq.ZMQ.close(client);
        zmq.ZMQ.close(server);
        zmq.ZMQ.term(ctx);
    }

I don't have time to upgrade to 11 now to try EpsilonGC.

Start test Recv 974520: Extraordinary latency: 2666 micros Recv 2402242: Extraordinary latency: 2562 micros Recv 3840841: Extraordinary latency: 2534 micros Recv 5279440: Extraordinary latency: 2926 micros Recv 6712578: Extraordinary latency: 2875 micros Send 7902070: Extraordinary latency: 615 micros Recv 8445429: Extraordinary latency: 2659 micros Avg Send: 0.0751493182 micros Avg Recv: 0.0895147853 micros End test

fredoboulo avatar Apr 23 '19 23:04 fredoboulo

I made the upgrade to 11, and used -XX:+UnlockExperimentalVMOptions -XX:+UseEpsilonGC options. No latency observed on both tests.

fredoboulo avatar Apr 24 '19 20:04 fredoboulo

@fredoboulo Thanks four your test.

I've used an improved version of your test code. Your receiving time might not include the transfer time. Also the initialization order of the sockets is different for inproc protocol.

@Test
public void testSporadicLatency() {
    final zmq.Ctx ctx = zmq.ZMQ.createContext();
    final zmq.SocketBase client = zmq.ZMQ.socket(ctx, zmq.ZMQ.ZMQ_DEALER);
    final zmq.SocketBase server = zmq.ZMQ.socket(ctx, zmq.ZMQ.ZMQ_ROUTER);

    final String endpoint = "inproc://test";
    zmq.ZMQ.bind(server, endpoint);
    zmq.ZMQ.connect(client, endpoint);
    final long threshold = TimeUnit.MILLISECONDS.toNanos(2L);
    final long max = 10_000_000L;
    // final String endpoint = "tcp://localhost:5555";
    // zmq.ZMQ.connect(client, endpoint);
    // zmq.ZMQ.bind(server, endpoint);
    // final long threshold = TimeUnit.MILLISECONDS.toNanos(4L);
    // final long max = 1_000_000L;

    long sendTotal = 0;
    long recvTotal = 0;

    final zmq.Msg msg = new zmq.Msg(4);
    System.out.println("Warmup");
    for (int i = 1; i <= 1_000; ++i) {
        zmq.ZMQ.send(client, msg, 0);
        zmq.ZMQ.recv(server, 0);
        zmq.ZMQ.recv(server, 0);
    }
    System.out.println("Start test");
    for (int i = 1; i <= max; ++i) {
        final long startSend = System.nanoTime();
        zmq.ZMQ.send(client, msg, 0);
        final long endSend = System.nanoTime();
        zmq.ZMQ.recv(server, 0);
        zmq.ZMQ.recv(server, 0);
        final long endRecv = System.nanoTime();

        final long sendLatency = endSend - startSend;
        if (sendLatency > threshold) {
            System.out.println(
                    "Send " + i + ": Extraordinary latency: "
                            + TimeUnit.NANOSECONDS.toMillis(sendLatency) + "ms");
        }
        final long recvLatency = endRecv - endSend;
        if (recvLatency > threshold) {
            System.out.println(
                    "Recv " + i + ": Extraordinary latency: "
                            + TimeUnit.NANOSECONDS.toMillis(recvLatency) + "ms");
        }

        sendTotal += sendLatency;
        recvTotal += recvLatency;
    }

    System.out.println("Avg Send: " + (sendTotal / (max * 1000d)) + " micros");
    System.out.println("Avg Recv: " + (recvTotal / (max * 1000d)) + " micros");

    System.out.println("End test");

    zmq.ZMQ.close(client);
    zmq.ZMQ.close(server);
    zmq.ZMQ.term(ctx);
}

With the inproc protocol I can confirm that the high latency is directly related to a garbage collection. See the log with activated GC log:

Start test
[0.473s][info][gc] GC(0) Pause Young (Normal) (G1 Evacuation Pause) 24M->1M(250M) 21.753ms
Recv 208519: Extraordinary latency: 22ms
[0.841s][info][gc] GC(1) Pause Young (Normal) (G1 Evacuation Pause) 148M->1M(250M) 10.925ms
Recv 1947387: Extraordinary latency: 11ms
[1.188s][info][gc] GC(2) Pause Young (Normal) (G1 Evacuation Pause) 148M->1M(250M) 11.679ms
Recv 3692151: Extraordinary latency: 11ms
[1.503s][info][gc] GC(3) Pause Young (Normal) (G1 Evacuation Pause) 148M->2M(250M) 10.259ms
Recv 5425048: Extraordinary latency: 10ms
[1.810s][info][gc] GC(4) Pause Young (Normal) (G1 Evacuation Pause) 148M->2M(382M) 8.800ms
Recv 7157963: Extraordinary latency: 9ms
[2.183s][info][gc] GC(5) Pause Young (Normal) (G1 Evacuation Pause) 207M->2M(382M) 6.552ms
Recv 9591164: Extraordinary latency: 7ms
Avg Send: 0.0607813701 micros
Avg Recv: 0.1000434124 micros
End test

However with tcp protocol it is not only the GC. Here is the log output for TCP:

Start test
Recv 1013: Extraordinary latency: 6ms
Recv 1544: Extraordinary latency: 7ms
Recv 2259: Extraordinary latency: 5ms
Recv 2286: Extraordinary latency: 4ms
Send 2354: Extraordinary latency: 6ms
Recv 3146: Extraordinary latency: 5ms
Recv 4628: Extraordinary latency: 4ms
Recv 5746: Extraordinary latency: 5ms
[2.508s][info][gc] GC(0) Pause Young (Normal) (G1 Evacuation Pause) 24M->1M(250M) 19.632ms
Recv 20779: Extraordinary latency: 19ms
[18.419s][info][gc] GC(1) Pause Young (Normal) (G1 Evacuation Pause) 147M->2M(250M) 8.584ms
Recv 216941: Extraordinary latency: 8ms
Recv 361933: Extraordinary latency: 4ms
[33.755s][info][gc] GC(2) Pause Young (Normal) (G1 Evacuation Pause) 148M->2M(250M) 10.543ms
Send 413073: Extraordinary latency: 10ms
Recv 426492: Extraordinary latency: 6ms
[48.828s][info][gc] GC(3) Pause Young (Normal) (G1 Evacuation Pause) 148M->2M(250M) 6.542ms
Send 609226: Extraordinary latency: 6ms
[63.859s][info][gc] GC(4) Pause Young (Normal) (G1 Evacuation Pause) 148M->2M(250M) 6.440ms
Recv 805374: Extraordinary latency: 6ms
Avg Send: 12.03648571 micros
Avg Recv: 65.929937928 micros
End test

Why is the receiving time sometimes a hundred times slower than the average? That's a huge magnitude.

@trevorbernard My test was a simplified version of our production setting. We have two dealers sending the same message to two independent routers. One router is implemented with Java (JeroMQ) and one router is implemented with C++. But only on the javaesque router version we experience such latency breaks on an idling system which can not (only) be explained by the GC. I'll improve my test.

sdoeringNew avatar Apr 25 '19 09:04 sdoeringNew

The changed test.

private static final long THRESHOLD = TimeUnit.MICROSECONDS.toNanos(4000L);

@Test
public void slowReceive() throws InterruptedException {
    final AtomicLong sendTime = new AtomicLong();
    final AtomicLong recvTotal = new AtomicLong();
    final AtomicInteger requestCounter = new AtomicInteger();
    final String endpoint = "tcp://localhost:5555";
    final long max = 1_000_000L;

    try (
            ZContext context = new ZContext();
            ZMQ.Socket client = context.createSocket(SocketType.DEALER)
    ) {
        client.connect(endpoint);

        final ExecutorService executor = Executors.newSingleThreadExecutor();
        executor.submit(new ServerTask(endpoint, requestCounter, sendTime, recvTotal));

        final ZMsg message = new ZMsg();
        message.add(new byte[]{0x00});

        System.out.println("Warmup");
        sendTime.set(Long.MAX_VALUE); // prevent log of extraordinary latency - it's a warmup
        for (int i = 0; i < 1000; ++i) {
            message.send(client, false);
            ZMsg.recvMsg(client, 0);
        }

        System.out.println("Start test");
        recvTotal.set(0L); // reset times
        while (requestCounter.incrementAndGet() <= max) {
            message.send(client, false);
            sendTime.set(System.nanoTime());

            ZMsg.recvMsg(client, 0);
        }
        System.out.println("End test");

        System.out.println("Avg Recv: " + (recvTotal.get() / (max * 1000d)) + " micros");

        // graceful shutdown
        sendTime.set(-1L); // signal the router to stop
        message.send(client);
        executor.shutdown();
        executor.awaitTermination(2L, TimeUnit.SECONDS);
    }
}

private static class ServerTask implements Runnable {
    private final String endpoint;
    private final AtomicInteger requestCounter;
    private final AtomicLong sendTime;
    private final AtomicLong recvTotal;

    private ServerTask(final String endpoint, final AtomicInteger requestCounter,
                       final AtomicLong sendTime, final AtomicLong recvTotal) {
        this.endpoint = endpoint;
        this.requestCounter = requestCounter;
        this.sendTime = sendTime;
        this.recvTotal = recvTotal;
    }

    @Override
    public void run() {
        try (
                ZContext context = new ZContext();
                ZMQ.Socket server = context.createSocket(SocketType.ROUTER)
        ) {
            server.bind(endpoint);

            while (!Thread.currentThread().isInterrupted()) {
                final ZMsg message = ZMsg.recvMsg(server, true);
                final long receiveTime = System.nanoTime();
                if (sendTime.get() == -1L) {
                    System.out.println("Break");
                    break;
                }

                final long recvLatency = (receiveTime - sendTime.get());
                if (recvLatency > THRESHOLD) {
                    System.out.println(
                            "Recv " + requestCounter.get() + ": Extraordinary latency: "
                                    + TimeUnit.NANOSECONDS.toMillis(recvLatency) + "ms");
                }

                message.send(server);

                recvTotal.addAndGet(recvLatency);
            }
        }
    }
}

With Epsilon GC it gave me those logs:

Start test
Recv 7: Extraordinary latency: 5ms
Recv 136: Extraordinary latency: 11ms
Recv 162: Extraordinary latency: 4ms
Recv 291: Extraordinary latency: 6ms
Recv 297: Extraordinary latency: 10ms
Recv 439: Extraordinary latency: 10ms
Recv 664: Extraordinary latency: 5ms
Recv 75850: Extraordinary latency: 15ms
Recv 85748: Extraordinary latency: 12ms
Recv 86697: Extraordinary latency: 14ms
Recv 95204: Extraordinary latency: 15ms
Recv 96182: Extraordinary latency: 15ms
[21.043s][info][gc] Heap: 3966M reserved, 247M (6.25%) committed, 198M (5.00%) used
Recv 104707: Extraordinary latency: 15ms
Recv 110908: Extraordinary latency: 14ms
Recv 119874: Extraordinary latency: 15ms
Recv 120822: Extraordinary latency: 14ms
Recv 129359: Extraordinary latency: 16ms
Recv 148841: Extraordinary latency: 16ms
Recv 158751: Extraordinary latency: 11ms
Recv 159699: Extraordinary latency: 14ms
Recv 174843: Extraordinary latency: 14ms
Recv 184594: Extraordinary latency: 14ms
Recv 189201: Extraordinary latency: 11ms
Recv 191137: Extraordinary latency: 5ms
[42.748s][info][gc] Heap: 3966M reserved, 503M (12.70%) committed, 396M (10.01%) used
Recv 215862: Extraordinary latency: 16ms
Recv 217616: Extraordinary latency: 8ms
Recv 228785: Extraordinary latency: 11ms
Recv 230994: Extraordinary latency: 15ms
Recv 240828: Extraordinary latency: 16ms
Recv 263589: Extraordinary latency: 14ms
Recv 264382: Extraordinary latency: 4ms
Recv 285021: Extraordinary latency: 15ms
Recv 289419: Extraordinary latency: 14ms
Recv 290366: Extraordinary latency: 15ms
Recv 300192: Extraordinary latency: 15ms
[64.692s][info][gc] Heap: 3966M reserved, 631M (15.93%) committed, 595M (15.01%) used
Recv 318592: Extraordinary latency: 14ms
Recv 319754: Extraordinary latency: 15ms
Recv 326178: Extraordinary latency: 14ms
Recv 327340: Extraordinary latency: 15ms
Recv 336797: Extraordinary latency: 14ms
Recv 359553: Extraordinary latency: 15ms
Recv 368435: Extraordinary latency: 15ms
Recv 387820: Extraordinary latency: 15ms
Recv 397444: Extraordinary latency: 15ms
Recv 407266: Extraordinary latency: 15ms
[86.222s][info][gc] Heap: 3966M reserved, 887M (22.38%) committed, 793M (20.01%) used
Recv 426702: Extraordinary latency: 16ms
Recv 427763: Extraordinary latency: 16ms
Recv 428083: Extraordinary latency: 4ms
Recv 435346: Extraordinary latency: 15ms
Recv 436294: Extraordinary latency: 14ms
Recv 454655: Extraordinary latency: 15ms
Recv 464634: Extraordinary latency: 12ms
Recv 465667: Extraordinary latency: 14ms
Recv 474197: Extraordinary latency: 15ms
Recv 494981: Extraordinary latency: 14ms
Recv 504546: Extraordinary latency: 14ms
Recv 513062: Extraordinary latency: 15ms
[107.728s][info][gc] Heap: 3966M reserved, 1015M (25.61%) committed, 992M (25.02%) used
Recv 533863: Extraordinary latency: 14ms
Recv 550954: Extraordinary latency: 14ms
Recv 559490: Extraordinary latency: 16ms
Recv 579362: Extraordinary latency: 15ms
Recv 587898: Extraordinary latency: 15ms
Recv 597402: Extraordinary latency: 15ms
Recv 607237: Extraordinary latency: 14ms
Recv 615772: Extraordinary latency: 15ms
Recv 633396: Extraordinary latency: 15ms
Recv 634345: Extraordinary latency: 14ms
[129.313s][info][gc] Heap: 3966M reserved, 1271M (32.06%) committed, 1190M (30.02%) used
Recv 642922: Extraordinary latency: 15ms
Recv 663734: Extraordinary latency: 14ms
Recv 672263: Extraordinary latency: 15ms
Recv 684047: Extraordinary latency: 14ms
Recv 702610: Extraordinary latency: 15ms
Recv 703559: Extraordinary latency: 13ms
Recv 712151: Extraordinary latency: 12ms
Recv 740538: Extraordinary latency: 15ms
Recv 741486: Extraordinary latency: 14ms
[150.156s][info][gc] Heap: 3966M reserved, 1399M (35.29%) committed, 1389M (35.03%) used
Recv 751981: Extraordinary latency: 14ms
Recv 761798: Extraordinary latency: 10ms
Recv 771829: Extraordinary latency: 14ms
Recv 789915: Extraordinary latency: 15ms
Recv 800680: Extraordinary latency: 14ms
Recv 810673: Extraordinary latency: 14ms
Recv 820262: Extraordinary latency: 16ms
Recv 828797: Extraordinary latency: 14ms
Recv 848577: Extraordinary latency: 14ms
Recv 849525: Extraordinary latency: 15ms
[171.251s][info][gc] Heap: 3966M reserved, 1655M (41.75%) committed, 1587M (40.03%) used
Recv 858195: Extraordinary latency: 15ms
Recv 859143: Extraordinary latency: 15ms
Recv 867678: Extraordinary latency: 14ms
Recv 888403: Extraordinary latency: 15ms
Recv 897077: Extraordinary latency: 15ms
Recv 907786: Extraordinary latency: 14ms
Recv 917805: Extraordinary latency: 14ms
Recv 933925: Extraordinary latency: 15ms
Recv 944494: Extraordinary latency: 14ms
Recv 954248: Extraordinary latency: 14ms
[192.098s][info][gc] Heap: 3966M reserved, 1911M (48.20%) committed, 1786M (45.03%) used
Recv 973893: Extraordinary latency: 13ms
Recv 982428: Extraordinary latency: 15ms
End test
Avg Recv: 91.916996785 micros

Around 0.01% of all requests were slow.

sdoeringNew avatar Apr 25 '19 14:04 sdoeringNew

Can anybody now confirm the issue?

sdoeringNew avatar May 10 '19 12:05 sdoeringNew

Still the same with the latest Snapshot.

Can anybody now confirm the issue?

sdoeringNew avatar Dec 11 '19 13:12 sdoeringNew

I'm probably lacking context, but it seems to me like we can't do much about latency caused by GC. I'd be curious to hear thoughts about what we can do.

daveyarwood avatar Apr 21 '20 22:04 daveyarwood

I narrowed the problem down on the receiving side. It's not sending that has a low latency it's only the receiving that might be slow.
With another test I was able to see that this not only occurs on receiving a message on the Router. Receiving the response on the Dealer can be slow, too.

After digging real deep I detected this line on the zmq.Signaler (around line 129):

rc = selector.select(0);

If there is a low latency it's coming from there.
In this case it's Javas EPollSelectorImpl implementation for Linux. (That explains why we don't experience that problem on our C++ Router as this is surely not using Javas EPoll Selector. 😄 )

Any chances of writing a custom EPollSelector?
Can I workaround our receiving strategy?

I'm probably lacking context, but it seems to me like we can't do much about latency caused by GC. I'd be curious to hear thoughts about what we can do.

As mentioned before it's no GC problem.

sdoeringNew avatar May 25 '20 19:05 sdoeringNew

That's interesting.

As far as I remember, once a socket goes through the configuration path (bind, connect,...) and enter communication path, the Signaler is mostly used for commands related to HWM (ACTIVATE_READ for example).

Could you try to set up a higher HWM on the sockets (1000 being the default)? Or even 0, to disable the limit.

fredoboulo avatar May 26 '20 20:05 fredoboulo

I did a little modification, the printf is done outside the loop, add a main to your code. I compiled it with 14.0.1, using the cross compile feature in pom: JAVA_HOME=$(/usr/libexec/java_home -v 14) mvn clean package -DskipTests -Pcrosscompile

The result is:

jdk-14.0.1+7/bin/java -cp jeromq-0.5.3-SNAPSHOT.jar -XX:+UnlockExperimentalVMOptions -XX:+UseEpsilonGC -Xmx2048M  zmq.Latency
hiccups
Warmup
Start test
End test
Avg Recv: 32.862384376 micros
Break
Recv 23943: Extraordinary latency: 4ms
Recv 819467: Extraordinary latency: 5ms

This was running on a CentOS 7.7 VM, fully up to date.

The same test, running on dedicated hardware, not even in single user.:

Warmup
Start test
End test
Avg Recv: 29.063566058 micros
Break

Did you run your tests on a VM or on dedicated hardware ?

fbacchella avatar May 27 '20 07:05 fbacchella

I rewrote a test to generate an histogram, instead to counting a thresold reached. The latencies are cumulated in buckets. I also run it on a dedicated hardware, in single user mode, with CPU affinity and JVM optimisation.

The code is:

package zmq;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg;

public class Latency {

    private static final char[] SYMBOLS = {'y', 'z', 'a', 'f', 'p', 'n', 'µ', 'm', ' ', 'k', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y'};
    private static class Formating {
        char symbol;
        float value;
        @Override
        public String toString() {
            return String.format("%.2f%s", value, symbol != ' ' ? symbol : "");
        }
        
    }

    public static void main(String[] args) throws InterruptedException {
        final AtomicLong sendTime = new AtomicLong();
        final AtomicInteger requestCounter = new AtomicInteger();
        final String endpoint = "tcp://localhost:5555";
        final long max = 1_000_000L;

        try (
                ZContext context = new ZContext();
                ZMQ.Socket client = context.createSocket(SocketType.DEALER)
        ) {
            client.connect(endpoint);

            Thread t = new Thread(new ServerTask(endpoint, requestCounter, sendTime));
            t.setPriority(10);
            t.start();
            Thread.currentThread().setPriority(9);

            final ZMsg message = new ZMsg();
            message.add(new byte[]{0x00});

            System.out.println("Warmup");
            sendTime.set(Long.MAX_VALUE); // prevent log of extraordinary latency - it's a warmup
            for (int i = 0; i < 1000; ++i) {
                message.send(client, false);
                ZMsg.recvMsg(client, 0);
            }

            System.out.println("Start test");
            while (requestCounter.incrementAndGet() <= max) {
                message.send(client, false);
                sendTime.set(System.nanoTime());

                ZMsg.recvMsg(client, 0);
            }
            System.out.println("End test");

            // graceful shutdown
            sendTime.set(-1L); // signal the router to stop
            message.send(client);
            t.join();
        }
    }

    private static class ServerTask implements Runnable {
        private final String endpoint;
        private final AtomicLong sendTime;
        private final long[] buckets = new long[] {0, 1 * 1000, 5 * 1000, 10 * 1000, 50 * 1000, 100 * 1000, 500 * 1000, 1000 * 1000, 5000 * 1000, 10000 * 1000, 100000 * 1000, Long.MAX_VALUE};
        private final long[] recvLatencies = new long[100];

        private ServerTask(final String endpoint, final AtomicInteger requestCounter,
                           final AtomicLong sendTime) {
            this.endpoint = endpoint;
            this.sendTime = sendTime;
        }

        @Override
        public void run() {
            try (
                    ZContext context = new ZContext();
                    ZMQ.Socket server = context.createSocket(SocketType.ROUTER)
            ) {
                server.bind(endpoint);

                while (!Thread.currentThread().isInterrupted()) {
                    final ZMsg message = ZMsg.recvMsg(server, true);
                    final long receiveTime = System.nanoTime();
                    if (sendTime.get() == -1L) {
                        System.out.println("Break");
                        break;
                    }

                    final long recvLatency = (receiveTime - sendTime.get());
                    for (int i = 0 ; i < buckets.length ; i++) {
                        if (buckets[i] >= recvLatency) {
                            recvLatencies[i]++;
                            break;
                        }
                    }

                    message.send(server);
                }
                for (int i = 0 ; i < buckets.length - 1 ; i++) {
                    System.out.format("%s—%s: %d\n",
                                      identifySiUnit(buckets[i]/1e9),
                                      buckets[i+1] == Long.MAX_VALUE ? "+∞" : identifySiUnit(buckets[i+1]/1e9),
                                      recvLatencies[i]);
                }
            }
        }
    }
    
    private static Formating identifySiUnit(double value) {
        Formating f = new Formating();
        if (value != 0) {
            int symbcenter = 8;
            double digits;
            digits = Math.floor(Math.log10(value)/3);
            //System.out.println(value + " " + Math.log10(value) + " " + Math.getExponent(value) + " " + digits);
            if (((digits + symbcenter) < SYMBOLS.length) && ((digits + symbcenter) >= 0)) {
                f.symbol = SYMBOLS[(int) digits + symbcenter];
                f.value = (float) (value * 1.0f/(Math.pow(1000, digits))) ;
            }
            else {
                f.symbol =  '?';
            }
        } else {
            f.symbol = ' ';
        }
        return f;
    }

}

The results are:

taskset 0xFF000000 jdk-14.0.1+7/bin/java -cp jeromq-0.5.3-SNAPSHOT.jar -XX:+UnlockExperimentalVMOptions -XX:+UseEpsilonGC -XX:+AlwaysPreTouch -XX:+UseTransparentHugePages -Xms2048M -Xmx2048M  zmq.Latency
0.00—1.00µ: 1000
1.00µ—5.00µ: 0
5.00µ—10.00µ: 0
10.00µ—50.00µ: 1
50.00µ—100.00µ: 999703
100.00µ—500.00µ: 171
500.00µ—1.00m: 125
1.00m—5.00m: 0
5.00m—10.00m: 0
10.00m—100.00m: 0
100.00m—+∞: 0

There is no TCP tunning that might improve slightly some latency. ~~The most mysterious thing is not the high latency but the 1000 very fast one. In all runs, I get the same result.~~. The 1000 fast one are the warmup that should not be in the buckets. For each improvement in the settings, the worst latencies decrease. It if run it on a VM, without any tunning:

jdk-14.0.1+7/bin/java -cp jeromq-0.5.3-SNAPSHOT.jar  zmq.Latency
0.00—1.00µ: 1000
1.00µ—5.00µ: 0
5.00µ—10.00µ: 0
10.00µ—50.00µ: 0
50.00µ—100.00µ: 971581
100.00µ—500.00µ: 27658
500.00µ—1.00m: 672
1.00m—5.00m: 60
5.00m—10.00m: 29
10.00m—100.00m: 0
100.00m—+∞: 0

Getting more precise results would needs playing with perf or EBPF, way above my competencies and my available time.

fbacchella avatar May 27 '20 10:05 fbacchella