lettuce icon indicating copy to clipboard operation
lettuce copied to clipboard

SimpleBatcher has threadsafe

Open desert-tear opened this issue 1 year ago • 2 comments

I am using version 6.2.0.RELEASE. I would like to ask about the Batch Execution using asynchronous commands like wiki.

@BatchSize(50)
interface StringCommands extends Commands {
    RedisFuture<String> get(String key, CommandBatching batching);
}

StringCommands commands = …
commands.get("key1", CommandBatching.queue());
commands.get("key2", CommandBatching.flush()); 

If the above codes run with multi threads, sometimes RedisFuture.get() will block for ever, actually some commands are not send to the Redis Server. It might caused by io.lettuce.core.dynamic.SimpleBatcher#flush(boolean), "while (flushing.compareAndSet(false, true))".

Thank you!

desert-tear avatar Sep 20 '22 04:09 desert-tear

Can you provide a reproducer that can recreate the problem?

mp911de avatar Oct 07 '22 08:10 mp911de

// redis server 6.2
// lettuce 6.2.0.RELEASE
class 1:
import io.lettuce.core.RedisFuture;
import io.lettuce.core.dynamic.Commands;
import io.lettuce.core.dynamic.batch.CommandBatching;

public interface BatchCommands extends Commands {
    RedisFuture<Long> pfcount(String key, CommandBatching batching);
}

class 2:
import io.lettuce.core.RedisFuture;
import io.lettuce.core.dynamic.batch.CommandBatching;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;

public class BatchOperations {
    private BatchCommands batchCommands;

    public BatchOperations(BatchCommands batchCommands) {
        this.batchCommands = batchCommands;
    }

    public List<Long> mpfcount(String... keys) {
        // check keys ...
        List<RedisFuture<Long>> futures = new ArrayList<>(keys.length);
        int len = keys.length - 1;
        if (len > 0) {
            for (int i = 0; i < len; i++) {
                futures.add(batchCommands.pfcount(keys[i], CommandBatching.queue()));
            }
        }
        futures.add(batchCommands.pfcount(keys[len], CommandBatching.flush()));
        return this.get(futures);
    }

    private <T> List<T> get(List<RedisFuture<T>> futures) {
        List<T> list = new ArrayList<>(futures.size());
        try {
            for (RedisFuture<T> future : futures) {
                list.add(future.get());
            }
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        return list;
    }
}

class 3:
import io.lettuce.core.RedisURI;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.dynamic.RedisCommandFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class MainTest {
    public static void main(String[] args) {
        RedisClusterClient redisClient = RedisClusterClient.create(RedisURI.create("host", 6380));
        RedisCommandFactory redisCommandFactory = new RedisCommandFactory(redisClient.connect());
        BatchCommands batchCommands = redisCommandFactory.getCommands(BatchCommands.class);
        BatchOperations batchOperations = new BatchOperations(batchCommands);
        ExecutorService threadPool = Executors.newFixedThreadPool(2);
        final int keyCount = 30;
        for (int i = 0; i < 2; i++) {
            int finalI = i;
            threadPool.execute(() -> {
                List<String> keys = new ArrayList<>(keyCount);
                for (int j = 1; j <= keyCount; j++) {
                    keys.add("key" + j + finalI);
                }
                List<Long> values = batchOperations.mpfcount(keys.toArray(new String[keyCount]));
                System.out.println(String.format("params:%s, values:%s", keys, values));
            });
        }
        threadPool.shutdown();
    }
}


desert-tear avatar Oct 08 '22 05:10 desert-tear