jedis
jedis copied to clipboard
Force interupt jedis.blpop
This is no way to stop/cancel block command like blpop
if jedis is borrowed from pool.
JedisPool pool = new JedisPool();
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
ListenableFuture<?> future = service.submit(() -> {
Jedis jedis = pool.getResource();
while (Thread.currentThread().isInterrupted()) {
System.out.println(jedis.blpop(30, "KEY"));
}
});
future.addListener(() -> System.out.println("Done"), MoreExecutors.directExecutor());
Thread.sleep(1000);
future.cancel(true);
future.get();
Even I cancel the future the thread is still running.
I have to use the reflection to get the real client, then close the stream to cancel the blocking command.
@wenerme Jedis interfaces doesn't match Futures yet as they weren't designed with async in mind. How do you propose to change it to allow operations to be cancelled?
There is no graceful way to achieve this, here is my idea
- Track blocking
- When need to force cancel blocking
- Broken connection by closing socket
- Close jedis to recycle the broken connect
AtomicReference<Jedis> ref = new AtomicReference<>();
AtomicBoolean blocking = new AtomicBoolean();
CompletableFuture<Void> future = Holder.async("blsub", key,
() -> {
Thread thread = Thread.currentThread();
accept(jedis -> {
ref.set(jedis);
while (!thread.isInterrupted()) {
blocking.set(true); // Tracking
List<String> list = jedis.blpop(30, k);
blocking.set(false);
if (list != null) {
consumer.accept(list.get(1));
}
}
});
});
future.whenComplete((v, ex) -> {
if (blocking.get()) {
try {
// Broken connection
AutoCloseable closeable = Reflect.on(ref.get()).get("client");
closeable.close();
} catch (Exception e) {
// ignored
}
// Recycle jedis
ref.get().close();
}
});
Maybe jedis should expose a method like interrupt
to do this, also should give a way to check blocking.
Any progress on this? I see it was open long time ago and I can't find a way to interrupt an xread call which woudl be very useful Could anyone help how to interrupt the xread?
@Ciccios A new param infiniteSoTimeout
has been introduced since Jedis 3.4.0. Does it work for you? Feel free to let us know your opinion.
@sazzad16 - I still cannot interrupt the xReadGroup by interrupting the thread where it is running on. I don't want that thread to recall the xReadGroup again and again at each soTimeout and it is not clear what the infiniteSoTimout purpose is. Could you please:
- explain infiniteSoTimeout purpose
- difference from soTimeout Thank you
@Ciccios In case of blocking commands, Jedis (by default) sets the socket timeout to 0
to wait for all the time it takes (aka infinite time) to get a response from Redis. In such cases, if infiniteSoTimeout
is set, that value would be used for socket timeout (instead of 0
). Which means, Jedis is expected to wait at most infiniteSoTimeout
to get response from Redis in case of blocking commands.
This issue is marked stale. It will be closed in 30 days if it is not updated.