quarkus
quarkus copied to clipboard
New Redis API, suscribe function not working when not used with @ApplicationScoped
Describe the bug
I'm writing a cache layer using redis and the pubsub module of the new redis api (since Quarkus 2.10).
When used with @ApplicationScoped as a bean, it works fine, but when trying to produce multiple beans for multiple caches, I first get his error.
2022-08-09 16:10:23,609 WARN [io.net.uti.con.AbstractEventExecutor] (vert.x-eventloop-thread-1) A task raised an exception. Task: io.vertx.core.impl.EventLoopContext$$Lambda$3225/0x00000008418bec40@6cfa8b21: java.lang.IllegalArgumentException: Parameter 'context' may not be null
at io.smallrye.common.constraint.Assert.checkNotNullParamChecked(Assert.java:32)
at io.smallrye.common.constraint.Assert.checkNotNullParam(Assert.java:26)
at io.smallrye.common.vertx.VertxContext.isDuplicatedContext(VertxContext.java:114)
at io.smallrye.common.vertx.VertxContext.getOrCreateDuplicatedContext(VertxContext.java:31)
at io.quarkus.redis.runtime.datasource.ReactivePubSubCommandsImpl$AbstractRedisSubscriber.lambda$subscribe$2(ReactivePubSubCommandsImpl.java:149)
at io.smallrye.mutiny.vertx.DelegatingConsumerHandler.accept(DelegatingConsumerHandler.java:28)
at io.smallrye.mutiny.vertx.DelegatingConsumerHandler.handle(DelegatingConsumerHandler.java:23)
at io.smallrye.mutiny.vertx.DelegatingHandler.handle(DelegatingHandler.java:25)
at io.vertx.core.impl.EventLoopContext.lambda$execute$2(EventLoopContext.java:78)
at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:503)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:829)
and then this error, when trying to subscribe
2022-08-09 16:10:28,577 ERROR [io.qua.ver.htt.run.QuarkusErrorHandler] (executor-thread-0) HTTP Request to /organization/4/current-histories failed, error id: 05bcc0d6-c6b2-4961-a3d8-e8e500af9ca1-1: io.smallrye.mutiny.TimeoutException
at io.smallrye.mutiny.operators.uni.UniBlockingAwait.await(UniBlockingAwait.java:64)
at io.smallrye.mutiny.groups.UniAwait.atMost(UniAwait.java:65)
at io.quarkus.redis.runtime.datasource.BlockingPubSubCommandsImpl.subscribe(BlockingPubSubCommandsImpl.java:51)
at io.quarkus.redis.runtime.datasource.BlockingPubSubCommandsImpl.subscribe(BlockingPubSubCommandsImpl.java:31)
at org.cnje.utils.cache.store.redis.CacheListener.<init>(CacheListener.java:21)
at org.cnje.utils.cache.store.redis.RedisHttpCache.<init>(RedisHttpCache.java:55)
at org.cnje.utils.cache.store.redis.RedisStoreProvider.getStore(RedisStoreProvider.java:63)
at org.cnje.utils.cache.store.redis.RedisStoreProvider_ClientProxy.getStore(Unknown Source)
at org.cnje.kiwi.config.HttpCacheProvider.makeHttpCacheHandler(HttpCacheProvider.java:37)
at org.cnje.kiwi.config.HttpCacheProvider.makeOrgUserHistoriesCache(HttpCacheProvider.java:67)
at org.cnje.kiwi.config.HttpCacheProvider_ProducerMethod_makeOrgUserHistoriesCache_ea377ede2549c2483e2e82280002ed6c592e8a31_Bean.create(Unknown Source)
at org.cnje.kiwi.config.HttpCacheProvider_ProducerMethod_makeOrgUserHistoriesCache_ea377ede2549c2483e2e82280002ed6c592e8a31_Bean.create(Unknown Source)
at io.quarkus.arc.impl.AbstractSharedContext.createInstanceHandle(AbstractSharedContext.java:111)
at io.quarkus.arc.impl.AbstractSharedContext$1.get(AbstractSharedContext.java:35)
at io.quarkus.arc.impl.AbstractSharedContext$1.get(AbstractSharedContext.java:32)
at io.quarkus.arc.impl.LazyValue.get(LazyValue.java:26)
at io.quarkus.arc.impl.ComputingCache.computeIfAbsent(ComputingCache.java:69)
at io.quarkus.arc.impl.AbstractSharedContext.get(AbstractSharedContext.java:32)
at io.quarkus.arc.impl.ClientProxies.getApplicationScopedDelegate(ClientProxies.java:19)
at org.cnje.utils.cache.HttpCacheProvider_ProducerMethod_makeOrgUserHistoriesCache_ea377ede2549c2483e2e82280002ed6c592e8a31_ClientProxy.arc$delegate(Unknown Source)
at org.cnje.utils.cache.HttpCacheProvider_ProducerMethod_makeOrgUserHistoriesCache_ea377ede2549c2483e2e82280002ed6c592e8a31_ClientProxy.handleResponseWithCache(Unknown Source)
Am I doing something wrong, or is it a bug ?
I did not find anything in the documentation related to this.
My files:
package org.cnje.utils.cache.store.redis;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import javax.enterprise.context.ApplicationScoped;
import io.quarkus.redis.datasource.ReactiveRedisDataSource;
import io.quarkus.redis.datasource.RedisDataSource;
import io.vertx.mutiny.redis.client.RedisAPI;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
/**
* Provides support to inject the redis datasource in
* the storeBackends while automiccally namespacing the data
*/
@RequiredArgsConstructor
@ApplicationScoped
public class RedisStoreProvider {
/**
* Means that you are out a namespace name
*/
public static class InsufficientNamespaceNamesAvailable extends Exception {
}
/**
* The redis datasource bound for caching
*/
private final ReactiveRedisDataSource ds;
private final RedisAPI api;
/**
* Names which will be used for namespacing the caches
*/
@Setter
@Getter
private List<String> namespaceNames = List.of(
"a", "b", "c", "d", "e", "f", "g", "h", "i",
"j", "k", "l", "m", "n", "o", "p", "q",
"r", "s", "t", "u", "v", "w", "x", "y", "z");
/**
* Iterator to get letter for namespacing in redis
*/
private final Iterator<String> it = namespaceNames.iterator();
/**
* Will throw if no letters to use with namespace
*
* @return
*/
public RedisHttpCache getStore() throws InsufficientNamespaceNamesAvailable {
try {
// final var a = new CacheListener("notification", ds);
return new RedisHttpCache(it.next(), ds, api);
} catch (final NoSuchElementException e) {
throw new InsufficientNamespaceNamesAvailable();
}
}
}
package org.cnje.utils.cache.store.redis;
import java.time.Duration;
import java.util.Date;
import java.util.function.Consumer;
import javax.annotation.PreDestroy;
import org.cnje.utils.cache.StoreBackend;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import io.quarkus.redis.datasource.ReactiveRedisDataSource;
import io.quarkus.redis.datasource.pubsub.ReactivePubSubCommands;
import io.quarkus.redis.datasource.string.ReactiveStringCommands;
import io.vertx.mutiny.redis.client.RedisAPI;
/**
* This class only implements a synchronized RedisStore for use with HttpCache
*/
public class RedisHttpCache implements StoreBackend<Long, Date>, Consumer<RedisCacheUpdateNotification> {
private final ReactivePubSubCommands<RedisCacheUpdateNotification> pub;
private final ReactiveStringCommands<String, Date> keys;
private final CacheListener listener;
private int initalCapacity = 100;
private int maximumSize = 100;
private final Duration timeout = Duration.ofMillis(1000);
/**
* Local cache, kept updated by
*/
private final Cache<Long, Date> cache;
/**
* To use for namespace, should only be non numeric character
*/
private final String name;
/**
*
* @param name To use for namespacing, should only contain non numeric characters
* @param ds
*/
public RedisHttpCache(final String name, final ReactiveRedisDataSource ds, final RedisAPI api) {
this.name = name;
this.cache = this.makeCache();
this.keys = ds.string(Date.class);
this.pub = ds.pubsub(RedisCacheUpdateNotification.class);
this.listener = new CacheListener(name, ds.getRedis(), api);
}
protected Cache<Long, Date> makeCache() {
return Caffeine.newBuilder()
.initialCapacity(initalCapacity)
.maximumSize(maximumSize)
.build();
}
// will update local values everywhere
@Override
public void accept(final RedisCacheUpdateNotification notification) {
// TODO: should filter if it's our own notification
this.cache.put(notification.key, notification.value);
}
private String prepareKey(final Long identifier) {
return this.name + identifier;
}
@Override
public Date get(final Long identifier) {
final var localGet = this.cache.getIfPresent(identifier);
if (localGet == null) {
return this.keys.get(prepareKey(identifier)).await().atMost(timeout); // this.name + identifier to namespace keys
}
return localGet;
}
@Override
public Date put(final Long identifier, final Date value) {
this.keys.set(prepareKey(identifier), value).await().atMost(timeout);
// for tests
final var localGet = this.keys.get(prepareKey(identifier));
if (localGet == null) {
throw new RuntimeException("failed to connect");
}
// notify others redis
this.pub.publish(this.name, new RedisCacheUpdateNotification(identifier, value));
return null; // to be compliant with store definition
}
}
package org.cnje.utils.cache.store.redis;
import java.time.Duration;
import java.util.function.Consumer;
import javax.annotation.PreDestroy;
import io.quarkus.redis.datasource.pubsub.PubSubCommands;
import io.quarkus.redis.runtime.datasource.BlockingRedisDataSourceImpl;
import io.vertx.mutiny.redis.client.Redis;
import io.vertx.mutiny.redis.client.RedisAPI;
public class CacheListener implements Consumer<RedisCacheUpdateNotification> {
private final PubSubCommands<RedisCacheUpdateNotification> pub;
private final PubSubCommands.RedisSubscriber subscriber;
public CacheListener(String name, Redis redis, RedisAPI api) {
final var ds = new BlockingRedisDataSourceImpl(redis, api, Duration.ofSeconds(5));
pub = ds.pubsub(RedisCacheUpdateNotification.class);
subscriber = pub.subscribe("notification", this);
}
@Override
public void accept(RedisCacheUpdateNotification notification) {
// Receive the notification
}
@PreDestroy
public void terminate() {
subscriber.unsubscribe(); // Unsubscribe from all subscribed channels
}
}
Expected behavior
We are subscribed
Actual behavior
Error
How to Reproduce?
Create a quarkus app with version 2.11.1.Final Use the given files
Output of uname -a or ver
No response
Output of java -version
11
GraalVM version (if different from Java)
No response
Quarkus version or git rev
No response
Build tool (ie. output of mvnw --version or gradlew --version)
maven 3.8
Additional information
No response
/cc @cescoffier, @gsmet, @machi1990
Hello, after even more debugging I found out, that the issue is having too many listeners, or at list the issue appears when i have 5 or more listeners
Better provide a full reproducer in the form of a small Maven project.
yes will do 👍
Hello, Here is the reproducer
https://github.com/Plawn/redis-quarkus-reproducer
here is the main test file
https://github.com/Plawn/redis-quarkus-reproducer/blob/master/src/main/java/org/acme/HttpCacheProvider.java
To test: just mvn quarkus:dev
When having too many listener there is a timeout
More over, the listener can't start without the @Startup annotation
This is kind of expected. Subscribers need to hold a connection to the Redis server (that's the Redis protocol). However, by default, the connection pool is set to have only six connections.
So you should restructure your application to reduce the number of connections (use a single subscriber listening to multiple keys) or increase the number of connections:
quarkus.redis.max-pool-size=....
Ok I see, in the redis documentation it stated that up to 10 000 listeners could listen to the same channel, so I did not expect this kind of issue. Thanks ! However is it expected to only be able to start listening at the start of the Quarkus app ? If I'm not listening inside a Bean annotated with @Startup the subscription won't work because of a null context
Edit: Changing the pool size solved my issue, but I still wonder why I only works at startup
Can you give more details about this startup issue? Do you mean you can only register subscribers during the startup of the application?
Hello !
Yes, If I don't mark the bean creating the redis subscriber with @Startup (like in the example), I will have a vertx null context exception.
Can you provide a reproducer? The Redis data source requires Vert.x but the Vert.x instance is created quite early in the process.
I believe I reproduced it:
2022-08-18 11:25:20,674 WARN [io.net.uti.con.AbstractEventExecutor] (vert.x-eventloop-thread-11) A task raised an exception. Task: io.vertx.core.impl.EventLoopContext$$Lambda$1042/0x00000008007fef90@1dac7cf9: java.lang.IllegalArgumentException: Parameter 'context' may not be null
at io.smallrye.common.constraint.Assert.checkNotNullParamChecked(Assert.java:32)
at io.smallrye.common.constraint.Assert.checkNotNullParam(Assert.java:26)
at io.smallrye.common.vertx.VertxContext.isDuplicatedContext(VertxContext.java:114)
at io.smallrye.common.vertx.VertxContext.getOrCreateDuplicatedContext(VertxContext.java:31)
at io.quarkus.redis.runtime.datasource.ReactivePubSubCommandsImpl$AbstractRedisSubscriber.lambda$subscribe$2(ReactivePubSubCommandsImpl.java:149)
(and it's likely a bug)
I believe I reproduced it:
2022-08-18 11:25:20,674 WARN [io.net.uti.con.AbstractEventExecutor] (vert.x-eventloop-thread-11) A task raised an exception. Task: io.vertx.core.impl.EventLoopContext$$Lambda$1042/0x00000008007fef90@1dac7cf9: java.lang.IllegalArgumentException: Parameter 'context' may not be null at io.smallrye.common.constraint.Assert.checkNotNullParamChecked(Assert.java:32) at io.smallrye.common.constraint.Assert.checkNotNullParam(Assert.java:26) at io.smallrye.common.vertx.VertxContext.isDuplicatedContext(VertxContext.java:114) at io.smallrye.common.vertx.VertxContext.getOrCreateDuplicatedContext(VertxContext.java:31) at io.quarkus.redis.runtime.datasource.ReactivePubSubCommandsImpl$AbstractRedisSubscriber.lambda$subscribe$2(ReactivePubSubCommandsImpl.java:149)
Yes I exactly got this error !
@Plawn see https://github.com/quarkusio/quarkus/pull/27361.
Closing this issue, as our discussion is about another issue. Please use #27361 to discuss that other issue.