lettuce icon indicating copy to clipboard operation
lettuce copied to clipboard

OutOfDirectMemoryError can cause different threads to read each other's data

Open bironran opened this issue 1 year ago • 6 comments

Bug Report

When reading from multiple connections using multiple threads, an OutOfDirectMemoryError can cause threads to read the result of commands sent by different threads, e.g. thread 1 issues "get(x)", thread 2 issues "get(y)" and but receives the result for key x.

Input Code

Input Code
import java.lang.management.ManagementFactory;
import java.time.Duration;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;

import io.lettuce.core.ClientOptions;
import io.lettuce.core.ClientOptions.DisconnectedBehavior;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisCommandInterruptedException;
import io.lettuce.core.SocketOptions;
import io.lettuce.core.TimeoutOptions;
import io.lettuce.core.api.StatefulRedisConnection;

import com.sun.management.HotSpotDiagnosticMXBean;
import com.sun.management.VMOption;

public class LettuceDirectMemoryFailure {
    public static void main(String[] args) throws InterruptedException {

        final HotSpotDiagnosticMXBean hsdiag = ManagementFactory.getPlatformMXBean(HotSpotDiagnosticMXBean.class);
        if (hsdiag != null) {
            final VMOption maxDirectMemorySize = hsdiag.getVMOption("MaxDirectMemorySize");
            System.out.println(maxDirectMemorySize);
            final long maxDirectMemorySizeLong = Long.parseLong(maxDirectMemorySize.getValue());
            if(maxDirectMemorySizeLong <= 0 || maxDirectMemorySizeLong > 6_000_000) {
                System.out.println("Test must be run with -XX:MaxDirectMemorySize=5M");
                return;
            }
        }

        AtomicLong failures = new AtomicLong(0L);
        AtomicLong success = new AtomicLong(0L);

        RedisClient redisClient = RedisClient.create("redis://127.0.0.1:7000/0");
        redisClient.setOptions(ClientOptions.builder().disconnectedBehavior(DisconnectedBehavior.ACCEPT_COMMANDS).socketOptions(
                        SocketOptions.builder().connectTimeout(Duration.ofMillis(50)).tcpNoDelay(true).build()).autoReconnect(true).
                timeoutOptions(TimeoutOptions.builder().timeoutCommands(true).fixedTimeout(Duration.ofSeconds(5)).build()).build());
        List<StatefulRedisConnection<String, String>> connections = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            connections.add(redisClient.connect());
        }

        String spaces = new String(new char[1_000_000]).replace('\0', ' ');
        final int NUMBER_OF_THREADS = 100;
        for (int i = 0; i < NUMBER_OF_THREADS; i++) {
            connections.get(0).sync().set("key" + i, i + ":" + spaces);
        }

        List<Thread> threads = new LinkedList<>();
        for (int i = 0; i < NUMBER_OF_THREADS; i++) {
            final int threadId = i;
            Runnable runnable = () -> {
                while (failures.get() == 0) {
                    try {
                        final String key = "key" + threadId;
                        final StatefulRedisConnection<String, String> connection = connections.get(threadId / 10);
                        final String result = connection.async().get(key).get(20, TimeUnit.SECONDS).split(":")[0];
                        if (result != null && !Integer.toString(threadId).equals(result)) {
                            System.out.println("ERROR on thread " + threadId + " got " + result);
                            failures.incrementAndGet();
                        } else {
                            success.incrementAndGet();
                        }
                    } catch (RedisCommandInterruptedException | InterruptedException | ExecutionException |
                             TimeoutException e) {
                        //ignore
                    }
                }
            };
            final Thread thread = new Thread(runnable, "worker " + i);
            thread.setDaemon(true);
            thread.start();
            threads.add(thread);
        }

        while(failures.get() == 0) {
            System.out.println("success: " + success.get() + " ; failures " + failures.get());
            Thread.sleep(100);
        }

        System.out.println("success: " + success.get() + " ; failures " + failures.get());

        for (Thread thread : threads) {
            thread.interrupt();
        }

        for (StatefulRedisConnection<String, String> connection : connections) {
            connection.close();
        }
        redisClient.shutdown();
    }
}

Expected behavior/code

Even on OutOfDirectMemoryError, different threads must never read results from commands they have not issued.

Environment

  • Lettuce version(s): 6.1.10.RELEASE, 6.3.1.RELEASE
  • Redis version: 7.2.4.

bironran avatar Jan 10 '24 20:01 bironran

Please note we consider this a super-critical bug. There's no way we can make sure this doesn't happen again, nor catch it before wrong results are returned and acted upon. Unless we can find a way to bullet-proof this, we'll be switching to Jedis with urgent priority, which is too bad as Lettuce provides quite a bit more functionality and ease of use.

bironran avatar Jan 10 '24 22:01 bironran

Any reason you do not switch to heap buffers? Managing direct memory, in addition to the heap size, can be quite challenging.

mp911de avatar Jan 11 '24 10:01 mp911de

@mp911de how would we do that? I've looked at -Dio.netty.noPreferDirect=true but according to https://github.com/netty/netty/issues/10189 it seems it has very little effect.

bironran avatar Jan 11 '24 16:01 bironran

Setting -Dio.netty.noPreferDirect=true is honored by Lettuce for read and write buffers. However, netty's read attempts to obtain a direct buffer still. In any case, you can reduce the direct memory impact.

The alternative using code is:

ClientResources resources = ClientResources.builder()
        .nettyCustomizer(new NettyCustomizer() {
            @Override
            public void afterBootstrapInitialized(Bootstrap bootstrap) {
                bootstrap.option(ChannelOption.ALLOCATOR, new PooledByteBufAllocator(false));
            }
        }).build();

mp911de avatar Jan 12 '24 08:01 mp911de

Thanks, this seem to work well with my test class.

Wouldn't that mode be a more sensible default, given the critical nature of the bug?

bironran avatar Jan 12 '24 17:01 bironran

hello, I encountered the same problem in the production environment, is there any plans to solve this problem?

codingdie avatar Apr 28 '24 14:04 codingdie