jedis icon indicating copy to clipboard operation
jedis copied to clipboard

Force interupt jedis.blpop

Open wenerme opened this issue 7 years ago • 7 comments

This is no way to stop/cancel block command like blpop if jedis is borrowed from pool.

        JedisPool pool = new JedisPool();
        ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
        ListenableFuture<?> future = service.submit(() -> {
            Jedis jedis = pool.getResource();
            while (Thread.currentThread().isInterrupted()) {
                System.out.println(jedis.blpop(30, "KEY"));
            }
        });

        future.addListener(() -> System.out.println("Done"), MoreExecutors.directExecutor());
        Thread.sleep(1000);
        future.cancel(true);
        future.get();

Even I cancel the future the thread is still running.

I have to use the reflection to get the real client, then close the stream to cancel the blocking command.

wenerme avatar Jul 25 '16 09:07 wenerme

@wenerme Jedis interfaces doesn't match Futures yet as they weren't designed with async in mind. How do you propose to change it to allow operations to be cancelled?

marcosnils avatar Jul 25 '16 15:07 marcosnils

There is no graceful way to achieve this, here is my idea

  • Track blocking
  • When need to force cancel blocking
    • Broken connection by closing socket
    • Close jedis to recycle the broken connect
AtomicReference<Jedis> ref = new AtomicReference<>();
        AtomicBoolean blocking = new AtomicBoolean();
        CompletableFuture<Void> future = Holder.async("blsub", key,
            () -> {
                Thread thread = Thread.currentThread();
                accept(jedis -> {
                    ref.set(jedis);
                    while (!thread.isInterrupted()) {
                        blocking.set(true); // Tracking
                        List<String> list = jedis.blpop(30, k);
                        blocking.set(false);
                        if (list != null) {
                            consumer.accept(list.get(1));
                        }
                    }
                });
            });
        future.whenComplete((v, ex) -> {
            if (blocking.get()) {
                try {
                    // Broken connection
                    AutoCloseable closeable = Reflect.on(ref.get()).get("client");
                    closeable.close();
                } catch (Exception e) {
                    // ignored
                }
                // Recycle jedis
                ref.get().close();
            }
        });

Maybe jedis should expose a method like interrupt to do this, also should give a way to check blocking.

wenerme avatar Jul 26 '16 16:07 wenerme

Any progress on this? I see it was open long time ago and I can't find a way to interrupt an xread call which woudl be very useful Could anyone help how to interrupt the xread?

Ciccios avatar Nov 22 '20 17:11 Ciccios

@Ciccios A new param infiniteSoTimeout has been introduced since Jedis 3.4.0. Does it work for you? Feel free to let us know your opinion.

sazzad16 avatar Dec 21 '20 13:12 sazzad16

@sazzad16 - I still cannot interrupt the xReadGroup by interrupting the thread where it is running on. I don't want that thread to recall the xReadGroup again and again at each soTimeout and it is not clear what the infiniteSoTimout purpose is. Could you please:

  • explain infiniteSoTimeout purpose
  • difference from soTimeout Thank you

Ciccios avatar Dec 25 '20 00:12 Ciccios

@Ciccios In case of blocking commands, Jedis (by default) sets the socket timeout to 0 to wait for all the time it takes (aka infinite time) to get a response from Redis. In such cases, if infiniteSoTimeout is set, that value would be used for socket timeout (instead of 0). Which means, Jedis is expected to wait at most infiniteSoTimeout to get response from Redis in case of blocking commands.

sazzad16 avatar Dec 25 '20 10:12 sazzad16

This issue is marked stale. It will be closed in 30 days if it is not updated.

github-actions[bot] avatar Feb 15 '24 00:02 github-actions[bot]