lettuce
lettuce copied to clipboard
SimpleBatcher has threadsafe
I am using version 6.2.0.RELEASE. I would like to ask about the Batch Execution using asynchronous commands like wiki.
@BatchSize(50)
interface StringCommands extends Commands {
RedisFuture<String> get(String key, CommandBatching batching);
}
StringCommands commands = …
commands.get("key1", CommandBatching.queue());
commands.get("key2", CommandBatching.flush());
If the above codes run with multi threads, sometimes RedisFuture.get() will block for ever, actually some commands are not send to the Redis Server. It might caused by io.lettuce.core.dynamic.SimpleBatcher#flush(boolean), "while (flushing.compareAndSet(false, true))".
Thank you!
Can you provide a reproducer that can recreate the problem?
// redis server 6.2
// lettuce 6.2.0.RELEASE
class 1:
import io.lettuce.core.RedisFuture;
import io.lettuce.core.dynamic.Commands;
import io.lettuce.core.dynamic.batch.CommandBatching;
public interface BatchCommands extends Commands {
RedisFuture<Long> pfcount(String key, CommandBatching batching);
}
class 2:
import io.lettuce.core.RedisFuture;
import io.lettuce.core.dynamic.batch.CommandBatching;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
public class BatchOperations {
private BatchCommands batchCommands;
public BatchOperations(BatchCommands batchCommands) {
this.batchCommands = batchCommands;
}
public List<Long> mpfcount(String... keys) {
// check keys ...
List<RedisFuture<Long>> futures = new ArrayList<>(keys.length);
int len = keys.length - 1;
if (len > 0) {
for (int i = 0; i < len; i++) {
futures.add(batchCommands.pfcount(keys[i], CommandBatching.queue()));
}
}
futures.add(batchCommands.pfcount(keys[len], CommandBatching.flush()));
return this.get(futures);
}
private <T> List<T> get(List<RedisFuture<T>> futures) {
List<T> list = new ArrayList<>(futures.size());
try {
for (RedisFuture<T> future : futures) {
list.add(future.get());
}
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
} catch (Exception e) {
throw new RuntimeException(e);
}
return list;
}
}
class 3:
import io.lettuce.core.RedisURI;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.dynamic.RedisCommandFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class MainTest {
public static void main(String[] args) {
RedisClusterClient redisClient = RedisClusterClient.create(RedisURI.create("host", 6380));
RedisCommandFactory redisCommandFactory = new RedisCommandFactory(redisClient.connect());
BatchCommands batchCommands = redisCommandFactory.getCommands(BatchCommands.class);
BatchOperations batchOperations = new BatchOperations(batchCommands);
ExecutorService threadPool = Executors.newFixedThreadPool(2);
final int keyCount = 30;
for (int i = 0; i < 2; i++) {
int finalI = i;
threadPool.execute(() -> {
List<String> keys = new ArrayList<>(keyCount);
for (int j = 1; j <= keyCount; j++) {
keys.add("key" + j + finalI);
}
List<Long> values = batchOperations.mpfcount(keys.toArray(new String[keyCount]));
System.out.println(String.format("params:%s, values:%s", keys, values));
});
}
threadPool.shutdown();
}
}