quarkus icon indicating copy to clipboard operation
quarkus copied to clipboard

New Redis API, suscribe function not working when not used with @ApplicationScoped

Open Plawn opened this issue 3 years ago • 2 comments

Describe the bug

I'm writing a cache layer using redis and the pubsub module of the new redis api (since Quarkus 2.10).

When used with @ApplicationScoped as a bean, it works fine, but when trying to produce multiple beans for multiple caches, I first get his error.

2022-08-09 16:10:23,609 WARN  [io.net.uti.con.AbstractEventExecutor] (vert.x-eventloop-thread-1) A task raised an exception. Task: io.vertx.core.impl.EventLoopContext$$Lambda$3225/0x00000008418bec40@6cfa8b21: java.lang.IllegalArgumentException: Parameter 'context' may not be null
	at io.smallrye.common.constraint.Assert.checkNotNullParamChecked(Assert.java:32)
	at io.smallrye.common.constraint.Assert.checkNotNullParam(Assert.java:26)
	at io.smallrye.common.vertx.VertxContext.isDuplicatedContext(VertxContext.java:114)
	at io.smallrye.common.vertx.VertxContext.getOrCreateDuplicatedContext(VertxContext.java:31)
	at io.quarkus.redis.runtime.datasource.ReactivePubSubCommandsImpl$AbstractRedisSubscriber.lambda$subscribe$2(ReactivePubSubCommandsImpl.java:149)
	at io.smallrye.mutiny.vertx.DelegatingConsumerHandler.accept(DelegatingConsumerHandler.java:28)
	at io.smallrye.mutiny.vertx.DelegatingConsumerHandler.handle(DelegatingConsumerHandler.java:23)
	at io.smallrye.mutiny.vertx.DelegatingHandler.handle(DelegatingHandler.java:25)
	at io.vertx.core.impl.EventLoopContext.lambda$execute$2(EventLoopContext.java:78)
	at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:503)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:829)

and then this error, when trying to subscribe

2022-08-09 16:10:28,577 ERROR [io.qua.ver.htt.run.QuarkusErrorHandler] (executor-thread-0) HTTP Request to /organization/4/current-histories failed, error id: 05bcc0d6-c6b2-4961-a3d8-e8e500af9ca1-1: io.smallrye.mutiny.TimeoutException
	at io.smallrye.mutiny.operators.uni.UniBlockingAwait.await(UniBlockingAwait.java:64)
	at io.smallrye.mutiny.groups.UniAwait.atMost(UniAwait.java:65)
	at io.quarkus.redis.runtime.datasource.BlockingPubSubCommandsImpl.subscribe(BlockingPubSubCommandsImpl.java:51)
	at io.quarkus.redis.runtime.datasource.BlockingPubSubCommandsImpl.subscribe(BlockingPubSubCommandsImpl.java:31)
	at org.cnje.utils.cache.store.redis.CacheListener.<init>(CacheListener.java:21)
	at org.cnje.utils.cache.store.redis.RedisHttpCache.<init>(RedisHttpCache.java:55)
	at org.cnje.utils.cache.store.redis.RedisStoreProvider.getStore(RedisStoreProvider.java:63)
	at org.cnje.utils.cache.store.redis.RedisStoreProvider_ClientProxy.getStore(Unknown Source)
	at org.cnje.kiwi.config.HttpCacheProvider.makeHttpCacheHandler(HttpCacheProvider.java:37)
	at org.cnje.kiwi.config.HttpCacheProvider.makeOrgUserHistoriesCache(HttpCacheProvider.java:67)
	at org.cnje.kiwi.config.HttpCacheProvider_ProducerMethod_makeOrgUserHistoriesCache_ea377ede2549c2483e2e82280002ed6c592e8a31_Bean.create(Unknown Source)
	at org.cnje.kiwi.config.HttpCacheProvider_ProducerMethod_makeOrgUserHistoriesCache_ea377ede2549c2483e2e82280002ed6c592e8a31_Bean.create(Unknown Source)
	at io.quarkus.arc.impl.AbstractSharedContext.createInstanceHandle(AbstractSharedContext.java:111)
	at io.quarkus.arc.impl.AbstractSharedContext$1.get(AbstractSharedContext.java:35)
	at io.quarkus.arc.impl.AbstractSharedContext$1.get(AbstractSharedContext.java:32)
	at io.quarkus.arc.impl.LazyValue.get(LazyValue.java:26)
	at io.quarkus.arc.impl.ComputingCache.computeIfAbsent(ComputingCache.java:69)
	at io.quarkus.arc.impl.AbstractSharedContext.get(AbstractSharedContext.java:32)
	at io.quarkus.arc.impl.ClientProxies.getApplicationScopedDelegate(ClientProxies.java:19)
	at org.cnje.utils.cache.HttpCacheProvider_ProducerMethod_makeOrgUserHistoriesCache_ea377ede2549c2483e2e82280002ed6c592e8a31_ClientProxy.arc$delegate(Unknown Source)
	at org.cnje.utils.cache.HttpCacheProvider_ProducerMethod_makeOrgUserHistoriesCache_ea377ede2549c2483e2e82280002ed6c592e8a31_ClientProxy.handleResponseWithCache(Unknown Source)

Am I doing something wrong, or is it a bug ?

I did not find anything in the documentation related to this.

My files:

package org.cnje.utils.cache.store.redis;

import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;

import javax.enterprise.context.ApplicationScoped;

import io.quarkus.redis.datasource.ReactiveRedisDataSource;
import io.quarkus.redis.datasource.RedisDataSource;
import io.vertx.mutiny.redis.client.RedisAPI;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;

/**
 * Provides support to inject the redis datasource in 
 * the storeBackends while automiccally namespacing the data
 */
@RequiredArgsConstructor
@ApplicationScoped
public class RedisStoreProvider {

    /**
     * Means that you are out a namespace name
     */
    public static class InsufficientNamespaceNamesAvailable extends Exception {

    }

    /**
     * The redis datasource bound for caching
     */
    private final ReactiveRedisDataSource ds;
    
    private final RedisAPI api;
    
    /**
     * Names which will be used for namespacing the caches
     */
    @Setter
    @Getter
    private List<String> namespaceNames = List.of(
            "a", "b", "c", "d", "e", "f", "g", "h", "i",
            "j", "k", "l", "m", "n", "o", "p", "q",
            "r", "s", "t", "u", "v", "w", "x", "y", "z");
    
    /**
     * Iterator to get letter for namespacing in redis
     */
    private final Iterator<String> it = namespaceNames.iterator();



    /**
     * Will throw if no letters to use with namespace
     * 
     * @return
     */
    public RedisHttpCache getStore() throws InsufficientNamespaceNamesAvailable {
        try {
            // final var a = new CacheListener("notification", ds);
            return new RedisHttpCache(it.next(), ds, api);
        } catch (final NoSuchElementException e) {
            throw new InsufficientNamespaceNamesAvailable();
        }
    }
}

package org.cnje.utils.cache.store.redis;


import java.time.Duration;
import java.util.Date;
import java.util.function.Consumer;

import javax.annotation.PreDestroy;

import org.cnje.utils.cache.StoreBackend;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;

import io.quarkus.redis.datasource.ReactiveRedisDataSource;
import io.quarkus.redis.datasource.pubsub.ReactivePubSubCommands;
import io.quarkus.redis.datasource.string.ReactiveStringCommands;
import io.vertx.mutiny.redis.client.RedisAPI;

/**
 * This class only implements a synchronized RedisStore for use with HttpCache
 */
public class RedisHttpCache implements StoreBackend<Long, Date>, Consumer<RedisCacheUpdateNotification> {

    private final ReactivePubSubCommands<RedisCacheUpdateNotification> pub;
    private final ReactiveStringCommands<String, Date> keys;
    private final CacheListener listener;
    private int initalCapacity = 100;

    private int maximumSize = 100;


    private final Duration timeout = Duration.ofMillis(1000);

    /**
     * Local cache, kept updated by
     */
    private final Cache<Long, Date> cache;
    
    /**
     * To use for namespace, should only be non numeric character
     */
    private final String name;

    /**
     *  
     * @param name To use for namespacing, should only contain non numeric characters
     * @param ds
     */
    public RedisHttpCache(final String name, final ReactiveRedisDataSource ds, final RedisAPI api) {
        this.name = name;
        this.cache = this.makeCache();
        this.keys = ds.string(Date.class);
        this.pub = ds.pubsub(RedisCacheUpdateNotification.class);
        this.listener = new CacheListener(name, ds.getRedis(), api);
    }

    protected Cache<Long, Date> makeCache() {
        return Caffeine.newBuilder()
        .initialCapacity(initalCapacity)
        .maximumSize(maximumSize)
        .build();
    }

    // will update local values everywhere
    @Override
    public void accept(final RedisCacheUpdateNotification notification) {
        // TODO: should filter if it's our own notification 
        this.cache.put(notification.key, notification.value);
    }


    private String prepareKey(final Long identifier) {
        return this.name + identifier;
    }

    @Override
    public Date get(final Long identifier) {
        final var localGet = this.cache.getIfPresent(identifier);
        if (localGet == null) {
            return this.keys.get(prepareKey(identifier)).await().atMost(timeout); // this.name + identifier to namespace keys
        }
        return localGet;
    }

    @Override
    public Date put(final Long identifier, final Date value) {
        this.keys.set(prepareKey(identifier), value).await().atMost(timeout);
        // for tests
        final var localGet = this.keys.get(prepareKey(identifier));
        if (localGet == null) {
            throw new RuntimeException("failed to connect");
        }
        // notify others redis
        this.pub.publish(this.name, new RedisCacheUpdateNotification(identifier, value));
        return null; // to be compliant with store definition
    }
}

package org.cnje.utils.cache.store.redis;

import java.time.Duration;
import java.util.function.Consumer;

import javax.annotation.PreDestroy;

import io.quarkus.redis.datasource.pubsub.PubSubCommands;
import io.quarkus.redis.runtime.datasource.BlockingRedisDataSourceImpl;
import io.vertx.mutiny.redis.client.Redis;
import io.vertx.mutiny.redis.client.RedisAPI;


public class CacheListener implements Consumer<RedisCacheUpdateNotification> {
    private final PubSubCommands<RedisCacheUpdateNotification> pub;
    private final PubSubCommands.RedisSubscriber subscriber;

    public CacheListener(String name, Redis redis, RedisAPI api) {
        final var ds = new BlockingRedisDataSourceImpl(redis, api, Duration.ofSeconds(5));
        pub = ds.pubsub(RedisCacheUpdateNotification.class);
        subscriber = pub.subscribe("notification", this);
    }

    @Override
    public void accept(RedisCacheUpdateNotification notification) {
        // Receive the notification
    }

    @PreDestroy
    public void terminate() {
        subscriber.unsubscribe(); // Unsubscribe from all subscribed channels
    }
}

Expected behavior

We are subscribed

Actual behavior

Error

How to Reproduce?

Create a quarkus app with version 2.11.1.Final Use the given files

Output of uname -a or ver

No response

Output of java -version

11

GraalVM version (if different from Java)

No response

Quarkus version or git rev

No response

Build tool (ie. output of mvnw --version or gradlew --version)

maven 3.8

Additional information

No response

Plawn avatar Aug 09 '22 16:08 Plawn

/cc @cescoffier, @gsmet, @machi1990

quarkus-bot[bot] avatar Aug 09 '22 16:08 quarkus-bot[bot]

Hello, after even more debugging I found out, that the issue is having too many listeners, or at list the issue appears when i have 5 or more listeners

Plawn avatar Aug 10 '22 15:08 Plawn

Better provide a full reproducer in the form of a small Maven project.

gsmet avatar Aug 10 '22 15:08 gsmet

yes will do 👍

Plawn avatar Aug 10 '22 16:08 Plawn

Hello, Here is the reproducer

https://github.com/Plawn/redis-quarkus-reproducer

here is the main test file

https://github.com/Plawn/redis-quarkus-reproducer/blob/master/src/main/java/org/acme/HttpCacheProvider.java

To test: just mvn quarkus:dev

When having too many listener there is a timeout

More over, the listener can't start without the @Startup annotation

Plawn avatar Aug 10 '22 19:08 Plawn

This is kind of expected. Subscribers need to hold a connection to the Redis server (that's the Redis protocol). However, by default, the connection pool is set to have only six connections.

So you should restructure your application to reduce the number of connections (use a single subscriber listening to multiple keys) or increase the number of connections:

quarkus.redis.max-pool-size=....

cescoffier avatar Aug 17 '22 09:08 cescoffier

Ok I see, in the redis documentation it stated that up to 10 000 listeners could listen to the same channel, so I did not expect this kind of issue. Thanks ! However is it expected to only be able to start listening at the start of the Quarkus app ? If I'm not listening inside a Bean annotated with @Startup the subscription won't work because of a null context

Edit: Changing the pool size solved my issue, but I still wonder why I only works at startup

Plawn avatar Aug 17 '22 10:08 Plawn

Can you give more details about this startup issue? Do you mean you can only register subscribers during the startup of the application?

cescoffier avatar Aug 18 '22 07:08 cescoffier

Hello !

Yes, If I don't mark the bean creating the redis subscriber with @Startup (like in the example), I will have a vertx null context exception.

Plawn avatar Aug 18 '22 07:08 Plawn

Can you provide a reproducer? The Redis data source requires Vert.x but the Vert.x instance is created quite early in the process.

cescoffier avatar Aug 18 '22 08:08 cescoffier

I believe I reproduced it:

2022-08-18 11:25:20,674 WARN  [io.net.uti.con.AbstractEventExecutor] (vert.x-eventloop-thread-11) A task raised an exception. Task: io.vertx.core.impl.EventLoopContext$$Lambda$1042/0x00000008007fef90@1dac7cf9: java.lang.IllegalArgumentException: Parameter 'context' may not be null
        at io.smallrye.common.constraint.Assert.checkNotNullParamChecked(Assert.java:32)
        at io.smallrye.common.constraint.Assert.checkNotNullParam(Assert.java:26)
        at io.smallrye.common.vertx.VertxContext.isDuplicatedContext(VertxContext.java:114)
        at io.smallrye.common.vertx.VertxContext.getOrCreateDuplicatedContext(VertxContext.java:31)
        at io.quarkus.redis.runtime.datasource.ReactivePubSubCommandsImpl$AbstractRedisSubscriber.lambda$subscribe$2(ReactivePubSubCommandsImpl.java:149)

cescoffier avatar Aug 18 '22 09:08 cescoffier

(and it's likely a bug)

cescoffier avatar Aug 18 '22 09:08 cescoffier

I believe I reproduced it:

2022-08-18 11:25:20,674 WARN  [io.net.uti.con.AbstractEventExecutor] (vert.x-eventloop-thread-11) A task raised an exception. Task: io.vertx.core.impl.EventLoopContext$$Lambda$1042/0x00000008007fef90@1dac7cf9: java.lang.IllegalArgumentException: Parameter 'context' may not be null
        at io.smallrye.common.constraint.Assert.checkNotNullParamChecked(Assert.java:32)
        at io.smallrye.common.constraint.Assert.checkNotNullParam(Assert.java:26)
        at io.smallrye.common.vertx.VertxContext.isDuplicatedContext(VertxContext.java:114)
        at io.smallrye.common.vertx.VertxContext.getOrCreateDuplicatedContext(VertxContext.java:31)
        at io.quarkus.redis.runtime.datasource.ReactivePubSubCommandsImpl$AbstractRedisSubscriber.lambda$subscribe$2(ReactivePubSubCommandsImpl.java:149)

Yes I exactly got this error !

Plawn avatar Aug 18 '22 09:08 Plawn

@Plawn see https://github.com/quarkusio/quarkus/pull/27361.

Closing this issue, as our discussion is about another issue. Please use #27361 to discuss that other issue.

cescoffier avatar Aug 18 '22 14:08 cescoffier