lettuce icon indicating copy to clipboard operation
lettuce copied to clipboard

perf: auto batch flush (more than 2x times performance gain)

Open okg-cxf opened this issue 1 year ago • 5 comments

Refer to issue: https://github.com/redis/lettuce/issues/2945 https://github.com/redis/lettuce/issues/2302

Make sure that:

  • [Y] You have read the contribution guidelines.
  • [Y] You have created a feature request first to discuss your contribution intent. Please reference the feature request ticket number in the pull request.
  • [Y] You applied code formatting rules using the mvn formatter:format target. Don’t submit any formatting related changes.
  • [Y] You submit test cases (unit or integration tests) that back your changes.

The optimization idea comes from group commit in database technology.

Bench result: Test Env: AWS EC2: t2.2xlarge Redis Version: 7.1.0 Redis Server: cache.r7g.large Test Model: multi thread sync get, 512 threads, each thread do 10,000 get Benchmark code:

package io.lettuce.bench;

import java.text.NumberFormat;
import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicLong;

import io.lettuce.bench.utils.BenchUtils;
import io.lettuce.core.AutoBatchFlushOptions;
import io.lettuce.core.ClientOptions;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
import io.lettuce.core.TimeoutOptions;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.codec.ByteArrayCodec;
import io.lettuce.core.internal.LettuceAssert;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

/**
 * @author chenxiaofan
 */
public class MultiThreadSyncGet {

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(MultiThreadSyncGet.class);

    private static final int THREAD_COUNT = 32;

    private static final int LOOP_NUM = 500_000;

    private static final int DIGIT_NUM = 9;

    private static final String KEY_FORMATTER = String.format("key-%%0%dd", DIGIT_NUM);

    private static final String VALUE_FORMATTER = String.format("value-%%0%dd", DIGIT_NUM);

    static {
        // noinspection ConstantValue
        LettuceAssert.assertState(DIGIT_NUM >= String.valueOf(LOOP_NUM).length() + 1, "digit num is not large enough");
    }

    void test(boolean useBatchFlush) {
        try (RedisClient redisClient = RedisClient.create(RedisURI.create("127.0.0.1", 6379))) {
            final ClientOptions.Builder optsBuilder = ClientOptions.builder()
                    .timeoutOptions(TimeoutOptions.builder().fixedTimeout(Duration.ofSeconds(7200)).build());
            if (useBatchFlush) {
                optsBuilder.autoBatchFlushOptions(AutoBatchFlushOptions.builder().enableAutoBatchFlush(true).build());
            }
            redisClient.setOptions(optsBuilder.build());
            final StatefulRedisConnection<byte[], byte[]> connection = redisClient.connect(ByteArrayCodec.INSTANCE);

            logger.info("thread count: {}", THREAD_COUNT);
            final Thread[] threads = new Thread[THREAD_COUNT];
            final AtomicLong totalCount = new AtomicLong();
            final AtomicLong totalLatency = new AtomicLong();
            for (int i = 0; i < THREAD_COUNT; i++) {
                threads[i] = new Thread(() -> {
                    for (int j = 0; j < LOOP_NUM; j++) {
                        final long cmdStart = System.nanoTime();
                        final byte[] resultBytes = connection.sync().get(genKey(j));
                        totalLatency.addAndGet((System.nanoTime() - cmdStart) / 1000);
                        LettuceAssert.assertState(Arrays.equals(genValue(j), resultBytes), "value not match");
                        totalCount.incrementAndGet();
                    }
                });
            }
            final long start = System.nanoTime();
            Arrays.asList(threads).forEach(Thread::start);
            Arrays.asList(threads).forEach(thread -> {
                try {
                    thread.join();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new IllegalStateException(e);
                }
            });
            double costInSeconds = (System.nanoTime() - start) / 1_000_000_000.0;
            logger.info("Total commands: {}", NumberFormat.getInstance().format(totalCount.get()));
            logger.info("Total time: {}s", costInSeconds);
            logger.info("Avg latency: {}us", totalLatency.get() / (double) totalCount.get());
            logger.info("Avg QPS: {}/s", totalCount.get() / costInSeconds);
            BenchUtils.logEnterRatioIfNeeded(logger);
            BenchUtils.logAvgBatchCount(logger);
        }
    }

    private byte[] genKey(int j) {
        return String.format(KEY_FORMATTER, j).getBytes();
    }

    private byte[] genValue(int j) {
        return String.format(VALUE_FORMATTER, j).getBytes();
    }

    public static void main(String[] args) {
        for (boolean useBatchFlush : new boolean[] { true, false }) {
            logger.info("=====================================");
            logger.info("useBatchFlush: {}", useBatchFlush);
            new MultiThreadSyncGet().test(useBatchFlush);
        }
        logger.info("=====================================");
    }

}

Bench result:

=====================================
10:53:11,074 INFO  [main] bench.MultiThreadSyncExists (MultiThreadSyncExists.java:129) - useBatchFlush: true
10:53:11,074 INFO  [main] bench.MultiThreadSyncExists (MultiThreadSyncExists.java:126) - batch size: 512
10:53:58,552 INFO  [main] bench.MultiThreadSyncExists (MultiThreadSyncExists.java:74) - Total commands: 5,120,000
10:53:58,552 INFO  [main] bench.MultiThreadSyncExists (MultiThreadSyncExists.java:75) - Total time: 47.009796512s
10:53:58,553 INFO  [main] bench.MultiThreadSyncExists (MultiThreadSyncExists.java:76) - Avg latency: 4693.2784484375us
10:53:58,553 INFO  [main] bench.MultiThreadSyncExists (MultiThreadSyncExists.java:77) - Avg QPS: 108913.46867866229/s
=====================================
10:53:58,597 INFO  [main] bench.MultiThreadSyncExists (MultiThreadSyncExists.java:129) - useBatchFlush: false
10:55:53,352 INFO  [main] bench.MultiThreadSyncExists (MultiThreadSyncExists.java:74) - Total commands: 5,120,000
10:55:53,352 INFO  [main] bench.MultiThreadSyncExists (MultiThreadSyncExists.java:75) - Total time: 114.732157413s
10:55:53,352 INFO  [main] bench.MultiThreadSyncExists (MultiThreadSyncExists.java:76) - Avg latency: 11458.427779882812us
10:55:53,353 INFO  [main] bench.MultiThreadSyncExists (MultiThreadSyncExists.java:77) - Avg QPS: 44625.67527227433/s
=====================================

okg-cxf avatar Aug 08 '24 12:08 okg-cxf

@tishun Hi, bro, could you help evaluate this PR? Got huge performance gain!

okg-cxf avatar Aug 08 '24 12:08 okg-cxf

batching will cause latency, AFAIK, netty already has flush balance handler for this: FlushConsolidationHandler

Roiocam avatar Aug 16 '24 08:08 Roiocam

@Roiocam Thx for the comments! I did a benchmark of my solution against FlushConsolidationHandler, see: https://github.com/okg-cxf/lettuce-core/blob/test/auto-batch-flush/bench-c5n-2xlarge-exists-b8.bench https://github.com/okg-cxf/lettuce-core/blob/test/auto-batch-flush/bench-c5n-2xlarge-exists-b32.bench https://github.com/okg-cxf/lettuce-core/blob/test/auto-batch-flush/bench-c5n-2xlarge-exists-b64.bench

Test file: https://github.com/okg-cxf/lettuce-core/blob/test/auto-batch-flush/src/main/java/io/lettuce/bench/MultiThreadSyncExists.java

Can conclude that my solution gets advantage when QPS is high.

okg-cxf avatar Aug 17 '24 06:08 okg-cxf

@tishun Hi, bro, could you help evaluate this PR? Got huge performance gain!

Hey @okg-cxf , thanks for filing this PR.

Since it is a large change that touches quite critical pieces of the code I will need some more time to go through it.

Currently I am mainly focused on #2933 but I will come back to this issue as soon as possible.

tishun avatar Aug 26 '24 15:08 tishun

Let's keep this open, I'd like to revisit it

tishun avatar Mar 19 '25 09:03 tishun