lettuce icon indicating copy to clipboard operation
lettuce copied to clipboard

BatchExecutor flush commands NPE

Open zhangjunqiang opened this issue 2 years ago • 10 comments

Bug Report

I create RedisBatchQuery with BatchExecutor API. I use the redisBatchQuery to sync lots of id to redis. but it will happend NPE Occasionally (at redisBatchQuery.flush() method). After careful observation. when the ids size is divisible by @BatchSize(1000), it will NPE. otherwise it will not happend. so i think there is something wrong with SimpleBatcher.flush().

Current Behavior

Stack trace
java.lang.NullPointerException: null
	at io.lettuce.core.cluster.StatefulRedisClusterConnectionImpl.preProcessCommand(StatefulRedisClusterConnectionImpl.java:233) ~[lms-schedule-3.0.3.5-SNAPSHOT-test.jar:1.0.0-SNAPSHOT]
	at io.lettuce.core.cluster.StatefulRedisClusterConnectionImpl .dispatch(StatefulRedisClusterConnectionImpl.java:215) ~[lms-schedule-3.0.3.5-SNAPSHOT-test.jar:1.0.0-SNAPSHOT]
	at io.lettuce.core.dynamic.SimpleBatcher.doFlush(SimpleBatcher.java:137) ~[lms-schedule-3.0.3.5-SNAPSHOT-test.jar:1.0.0-SNAPSHOT]
	at io.lettuce.core.dynamic.SimpleBatcher.flush(SimpleBatcher.java:107) ~[lms-schedule-3.0.3.5-SNAPSHOT-test.jar:1.0.0-SNAPSHOT]
	at io.lettuce.core.dynamic.SimpleBatcher.flush(SimpleBatcher.java:84) ~[lms-schedule-3.0.3.5-SNAPSHOT-test.jar:1.0.0-SNAPSHOT]
	at io.lettuce.core.dynamic.BatchExecutableCommandLookupStrategy$1.execute(BatchExecutableCommandLookupStrategy.java:79) ~[lms-schedule-3.0.3.5-SNAPSHOT-test.jar:1.0.0-SNAPSHOT]
	at io.lettuce.core.dynamic.RedisCommandFactory$CommandFactoryExecutorMethodInterceptor.invoke(RedisCommandFactory.java:235) ~[lms-schedule-3.0.3.5-SNAPSHOT-test.jar:1.0.0-SNAPSHOT]
	at io.lettuce.core.dynamic.intercept.MethodInterceptorChain$MethodInterceptorContext.proceed(MethodInterceptorChain.java:118) ~[lms-schedule-3.0.3.5-SNAPSHOT-test.jar:1.0.0-SNAPSHOT]
	at io.lettuce.core.dynamic.intercept.MethodInterceptorChain$PooledMethodInvocation.proceed(MethodInterceptorChain.java:201) ~[lms-schedule-3.0.3.5-SNAPSHOT-test.jar:1.0.0-SNAPSHOT]
	at io.lettuce.core.dynamic.intercept.DefaultMethodInvokingInterceptor.invoke(DefaultMethodInvokingInterceptor.java:45) ~[lms-schedule-3.0.3.5-SNAPSHOT-test.jar:1.0.0-SNAPSHOT]
	at io.lettuce.core.dynamic.intercept.MethodInterceptorChain$MethodInterceptorContext.proceed(MethodInterceptorChain.java:118) ~[lms-schedule-3.0.3.5-SNAPSHOT-test.jar:1.0.0-SNAPSHOT]
	at io.lettuce.core.dynamic.intercept.MethodInterceptorChain.invoke(MethodInterceptorChain.java:80) ~[lms-schedule-3.0.3.5-SNAPSHOT-test.jar:1.0.0-SNAPSHOT]
	at io.lettuce.core.dynamic.intercept.InvocationProxyFactory$InterceptorChainInvocationHandler.handleInvocation(InvocationProxyFactory.java:102) ~[lms-schedule-3.0.3.5-SNAPSHOT-test.jar:1.0.0-SNAPSHOT]
	at io.lettuce.core.internal.AbstractInvocationHandler.invoke(AbstractInvocationHandler.java:80) ~[lms-schedule-3.0.3.5-SNAPSHOT-test.jar:1.0.0-SNAPSHOT]
	at com.sun.proxy.$Proxy39.flush(Unknown Source) ~[na:na]
	at com.netease.yanxuan.lms.schedule.redis.RedisSyncClient.flushRemainFactIds(RedisSyncClient.java:29) ~[lms-schedule-3.0.3.5-SNAPSHOT-test.jar:1.0.0-SNAPSHOT]

Input Code

Input Code
@BatchSize(1000)
public interface RedisBatchQuery extends Commands, BatchExecutor {

    RedisFuture<Long> sadd(String key, String ... members);

    RedisFuture<Boolean> expire(String key,  long seconds);

}


@Slf4j
public class RedisSyncClient {
    private static final int MONTH_EXPIRE_TIME = 30 * 24 * 3600;

    private static final int WEEK_EXPIRE_TIME =  7 * 24 * 3600;
    @Inject
    private RedisBatchQuery redisBatchQuery;

    public void syncFactId2Redis(String factId, FactResType factResType, Long factSetId) {
        if(StringUtils.isBlank(factId)) {
            return;
        }
        String factIdKey = factResType.getFactGroupMapKey(factId);
        int expireTime = EnvUtils.is(EnvTypeEnums.ENV_TEST) ? WEEK_EXPIRE_TIME : MONTH_EXPIRE_TIME;
        redisBatchQuery.sadd(factIdKey, factSetId.toString());
        redisBatchQuery.expire(factIdKey, expireTime);
    }

    public void flushRemainFactIds(){
        redisBatchQuery.flush();
    }
}

Expected behavior/code

Environment

  • Lettuce version(s): 6.1.8.RELEASE
  • Redis version: 4.0.8
  • Redis deploy with cluster

Possible Solution

public void flushRemainFactIds(){
        // add a command, it will not NPE
        redisBatchQuery.flush();
    }

Additional context

zhangjunqiang avatar Jun 24 '22 07:06 zhangjunqiang

Thanks for the report. I have issues reproducing the issue. The Command class does not permit commands without a type so we cannot construct a case in which the command type would be null.

mp911de avatar Jun 24 '22 12:06 mp911de

Thanks for the report. I have issues reproducing the issue. The Command class does not permit commands without a type so we cannot construct a case in which the command type would be null.

sorry, i don't get your mind. The RedisBatchQuery class is implement just like https://github.com/lettuce-io/lettuce-core/wiki/Redis-Command-Interfaces or https://github.com/lettuce-io/lettuce-core/issues/624. I did't find somewhere to define the "type".

zhangjunqiang avatar Jun 25 '22 12:06 zhangjunqiang

The redis connect init with guice like this.

public class StatefulRedisClusterConnectionProvider implements Provider<StatefulRedisClusterConnection<String, String>> {

    private Set<RedisURI> redisURIs;
    public StatefulRedisClusterConnectionProvider(Set<RedisURI> redisURIs) {
        this.redisURIs = redisURIs;
    }
    @Override
    public StatefulRedisClusterConnection<String, String> get() {
        final RedisClusterClient redisClusterClient = RedisClusterClient.create(redisURIs);
        // https://github.com/lettuce-io/lettuce-core/wiki/Redis-Cluster#user-content-refreshing-the-cluster-topology-view
        ClusterTopologyRefreshOptions options = ClusterTopologyRefreshOptions.builder()
                .refreshPeriod(Duration.ofSeconds(60))
                .enableAllAdaptiveRefreshTriggers()
                .adaptiveRefreshTriggersTimeout(Duration.ofSeconds(30))
                .refreshTriggersReconnectAttempts(5)
                .build();
        redisClusterClient.setOptions(ClusterClientOptions.builder()
                .topologyRefreshOptions(options)
                .autoReconnect(true)
                .maxRedirects(5)
                .disconnectedBehavior(ClientOptions.DisconnectedBehavior.DEFAULT)
                .socketOptions(SocketOptions.builder()
                        .keepAlive(true)
                        .tcpNoDelay(true)
                        .build())
                .validateClusterNodeMembership(false)
                .build());
        StatefulRedisClusterConnection<String, String> connection = redisClusterClient.connect();
        connection.setReadFrom(ReadFrom.REPLICA_PREFERRED);
        connection.setAutoFlushCommands(true);
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            connection.close();
            redisClusterClient.shutdown();
        }));
        return connection;
    }
}

zhangjunqiang avatar Jun 25 '22 12:06 zhangjunqiang

Having a similar issue. Heres my stacktrace, also caused by calling a batch flush:

java.lang.NullPointerException at io.lettuce.core.StatefulRedisConnectionImpl.preProcessCommand(StatefulRedisConnectionImpl.java:201) at io.lettuce.core.StatefulRedisConnectionImpl.dispatch(StatefulRedisConnectionImpl.java:159) at io.lettuce.core.dynamic.SimpleBatcher.doFlush(SimpleBatcher.java:137) at io.lettuce.core.dynamic.SimpleBatcher.flush(SimpleBatcher.java:107) at io.lettuce.core.dynamic.SimpleBatcher.flush(SimpleBatcher.java:84) at io.lettuce.core.dynamic.BatchExecutableCommandLookupStrategy$1.execute(BatchExecutableCommandLookupStrategy.java:79) at io.lettuce.core.dynamic.RedisCommandFactory$CommandFactoryExecutorMethodInterceptor.invoke(RedisCommandFactory.java:235) at io.lettuce.core.dynamic.intercept.MethodInterceptorChain$MethodInterceptorContext.proceed(MethodInterceptorChain.java:118) at io.lettuce.core.dynamic.intercept.MethodInterceptorChain$PooledMethodInvocation.proceed(MethodInterceptorChain.java:201) at io.lettuce.core.dynamic.intercept.DefaultMethodInvokingInterceptor.invoke(DefaultMethodInvokingInterceptor.java:45) at io.lettuce.core.dynamic.intercept.MethodInterceptorChain$MethodInterceptorContext.proceed(MethodInterceptorChain.java:118) at io.lettuce.core.dynamic.intercept.MethodInterceptorChain.invoke(MethodInterceptorChain.java:80) at io.lettuce.core.dynamic.intercept.InvocationProxyFactory$InterceptorChainInvocationHandler.handleInvocation(InvocationProxyFactory.java:102) at io.lettuce.core.internal.AbstractInvocationHandler.invoke(AbstractInvocationHandler.java:80) at com.sun.proxy.$Proxy46.flush(Unknown Source)

Using lettuce version 6.1.6.RELEASE

NoahGatsby avatar Aug 03 '22 14:08 NoahGatsby

It looks like we are both getting an NPE on this line (cluster and standalone): if (local.getType().name().equals(AUTH.name())) {

NoahGatsby avatar Aug 03 '22 16:08 NoahGatsby

it seems to me if getType() returns a non null, all name() implementations return a non-null.

So the NPE is probably coming from getType() returning null. As far as I can see that only occurs with the PristineFallbackCommand Command

NoahGatsby avatar Aug 03 '22 16:08 NoahGatsby

I believe the issue is harder to find because it depends on whether or not assertions are enabled In SimpleBatcher.java:

private List<RedisCommand<Object, Object, Object>> prepareForceFlush() {

        List<RedisCommand<Object, Object, Object>> batch = new ArrayList<>(Math.max(batchSize, 10));

        do {
            RedisCommand<Object, Object, Object> poll = queue.poll();

            assert poll != null;
            batch.add(poll);
        } while (!queue.isEmpty());

        return batch;
    }

So this returns a list with a null command with no assertions. I'd suggest just skipping the flush in such a case, unless the intention is to error when flush is empty?

noahetoro avatar Oct 20 '23 17:10 noahetoro

You end up with a null command sent to the dispatch where you get the NPE:


private List<? extends RedisCommand<?, ?, ?>> doFlush(boolean forcedFlush, boolean defaultFlush, int consume) {

        List<RedisCommand<Object, Object, Object>> commands = null;
        if (forcedFlush) {
            commands = prepareForceFlush();               // COMMAND HAS 1 ITEM THAT IS NULL
        } else if (defaultFlush) {
            commands = prepareDefaultFlush(consume);
        }

        if (commands != null && !commands.isEmpty()) {
            if (commands.size() == 1) {
                connection.dispatch(commands.get(0));           // RETURNS NULL
            } else {
                connection.dispatch(commands);
            }

            return commands;
        }
        return Collections.emptyList();
    }

nsragow avatar Oct 20 '23 18:10 nsragow

And then StatefulRedisConnectionImpl.java gets a null argument:

protected <T> RedisCommand<K, V, T> preProcessCommand(RedisCommand<K, V, T> command) {

        RedisCommand<K, V, T> local = command;

        if (local.getType().name().equals(AUTH.name())) {

nsragow avatar Oct 20 '23 18:10 nsragow

From a spot check it seems this behavior is already solved in the latest:

private List<RedisCommand<Object, Object, Object>> prepareForceFlush() {

        List<RedisCommand<Object, Object, Object>> batch = newDrainTarget();

        while (!queue.isEmpty()) {

            RedisCommand<Object, Object, Object> poll = queue.poll();

            if (poll != null) {
                batch.add(poll);
            }
        }

        return batch;
    }

nsragow avatar Oct 20 '23 18:10 nsragow