perf: auto batch flush (more than 2x times performance gain)
Refer to issue: https://github.com/redis/lettuce/issues/2945 https://github.com/redis/lettuce/issues/2302
Make sure that:
- [Y] You have read the contribution guidelines.
- [Y] You have created a feature request first to discuss your contribution intent. Please reference the feature request ticket number in the pull request.
- [Y] You applied code formatting rules using the
mvn formatter:formattarget. Don’t submit any formatting related changes. - [Y] You submit test cases (unit or integration tests) that back your changes.
The optimization idea comes from group commit in database technology.
Bench result:
Test Env: AWS EC2: t2.2xlarge
Redis Version: 7.1.0
Redis Server: cache.r7g.large
Test Model: multi thread sync get, 512 threads, each thread do 10,000 get
Benchmark code:
package io.lettuce.bench;
import java.text.NumberFormat;
import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicLong;
import io.lettuce.bench.utils.BenchUtils;
import io.lettuce.core.AutoBatchFlushOptions;
import io.lettuce.core.ClientOptions;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
import io.lettuce.core.TimeoutOptions;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.codec.ByteArrayCodec;
import io.lettuce.core.internal.LettuceAssert;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
/**
* @author chenxiaofan
*/
public class MultiThreadSyncGet {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(MultiThreadSyncGet.class);
private static final int THREAD_COUNT = 32;
private static final int LOOP_NUM = 500_000;
private static final int DIGIT_NUM = 9;
private static final String KEY_FORMATTER = String.format("key-%%0%dd", DIGIT_NUM);
private static final String VALUE_FORMATTER = String.format("value-%%0%dd", DIGIT_NUM);
static {
// noinspection ConstantValue
LettuceAssert.assertState(DIGIT_NUM >= String.valueOf(LOOP_NUM).length() + 1, "digit num is not large enough");
}
void test(boolean useBatchFlush) {
try (RedisClient redisClient = RedisClient.create(RedisURI.create("127.0.0.1", 6379))) {
final ClientOptions.Builder optsBuilder = ClientOptions.builder()
.timeoutOptions(TimeoutOptions.builder().fixedTimeout(Duration.ofSeconds(7200)).build());
if (useBatchFlush) {
optsBuilder.autoBatchFlushOptions(AutoBatchFlushOptions.builder().enableAutoBatchFlush(true).build());
}
redisClient.setOptions(optsBuilder.build());
final StatefulRedisConnection<byte[], byte[]> connection = redisClient.connect(ByteArrayCodec.INSTANCE);
logger.info("thread count: {}", THREAD_COUNT);
final Thread[] threads = new Thread[THREAD_COUNT];
final AtomicLong totalCount = new AtomicLong();
final AtomicLong totalLatency = new AtomicLong();
for (int i = 0; i < THREAD_COUNT; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < LOOP_NUM; j++) {
final long cmdStart = System.nanoTime();
final byte[] resultBytes = connection.sync().get(genKey(j));
totalLatency.addAndGet((System.nanoTime() - cmdStart) / 1000);
LettuceAssert.assertState(Arrays.equals(genValue(j), resultBytes), "value not match");
totalCount.incrementAndGet();
}
});
}
final long start = System.nanoTime();
Arrays.asList(threads).forEach(Thread::start);
Arrays.asList(threads).forEach(thread -> {
try {
thread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
}
});
double costInSeconds = (System.nanoTime() - start) / 1_000_000_000.0;
logger.info("Total commands: {}", NumberFormat.getInstance().format(totalCount.get()));
logger.info("Total time: {}s", costInSeconds);
logger.info("Avg latency: {}us", totalLatency.get() / (double) totalCount.get());
logger.info("Avg QPS: {}/s", totalCount.get() / costInSeconds);
BenchUtils.logEnterRatioIfNeeded(logger);
BenchUtils.logAvgBatchCount(logger);
}
}
private byte[] genKey(int j) {
return String.format(KEY_FORMATTER, j).getBytes();
}
private byte[] genValue(int j) {
return String.format(VALUE_FORMATTER, j).getBytes();
}
public static void main(String[] args) {
for (boolean useBatchFlush : new boolean[] { true, false }) {
logger.info("=====================================");
logger.info("useBatchFlush: {}", useBatchFlush);
new MultiThreadSyncGet().test(useBatchFlush);
}
logger.info("=====================================");
}
}
Bench result:
=====================================
10:53:11,074 INFO [main] bench.MultiThreadSyncExists (MultiThreadSyncExists.java:129) - useBatchFlush: true
10:53:11,074 INFO [main] bench.MultiThreadSyncExists (MultiThreadSyncExists.java:126) - batch size: 512
10:53:58,552 INFO [main] bench.MultiThreadSyncExists (MultiThreadSyncExists.java:74) - Total commands: 5,120,000
10:53:58,552 INFO [main] bench.MultiThreadSyncExists (MultiThreadSyncExists.java:75) - Total time: 47.009796512s
10:53:58,553 INFO [main] bench.MultiThreadSyncExists (MultiThreadSyncExists.java:76) - Avg latency: 4693.2784484375us
10:53:58,553 INFO [main] bench.MultiThreadSyncExists (MultiThreadSyncExists.java:77) - Avg QPS: 108913.46867866229/s
=====================================
10:53:58,597 INFO [main] bench.MultiThreadSyncExists (MultiThreadSyncExists.java:129) - useBatchFlush: false
10:55:53,352 INFO [main] bench.MultiThreadSyncExists (MultiThreadSyncExists.java:74) - Total commands: 5,120,000
10:55:53,352 INFO [main] bench.MultiThreadSyncExists (MultiThreadSyncExists.java:75) - Total time: 114.732157413s
10:55:53,352 INFO [main] bench.MultiThreadSyncExists (MultiThreadSyncExists.java:76) - Avg latency: 11458.427779882812us
10:55:53,353 INFO [main] bench.MultiThreadSyncExists (MultiThreadSyncExists.java:77) - Avg QPS: 44625.67527227433/s
=====================================
@tishun Hi, bro, could you help evaluate this PR? Got huge performance gain!
batching will cause latency, AFAIK, netty already has flush balance handler for this: FlushConsolidationHandler
@Roiocam Thx for the comments! I did a benchmark of my solution against FlushConsolidationHandler, see: https://github.com/okg-cxf/lettuce-core/blob/test/auto-batch-flush/bench-c5n-2xlarge-exists-b8.bench https://github.com/okg-cxf/lettuce-core/blob/test/auto-batch-flush/bench-c5n-2xlarge-exists-b32.bench https://github.com/okg-cxf/lettuce-core/blob/test/auto-batch-flush/bench-c5n-2xlarge-exists-b64.bench
Test file: https://github.com/okg-cxf/lettuce-core/blob/test/auto-batch-flush/src/main/java/io/lettuce/bench/MultiThreadSyncExists.java
Can conclude that my solution gets advantage when QPS is high.
@tishun Hi, bro, could you help evaluate this PR? Got huge performance gain!
Hey @okg-cxf , thanks for filing this PR.
Since it is a large change that touches quite critical pieces of the code I will need some more time to go through it.
Currently I am mainly focused on #2933 but I will come back to this issue as soon as possible.
Let's keep this open, I'd like to revisit it