go-redis icon indicating copy to clipboard operation
go-redis copied to clipboard

Blocking XGroupCreateMkStream does not interrupt on context cancellation

Open jgirtakovskis opened this issue 3 years ago • 18 comments

When XGroupCreateMkStream is called in blocking mode (Block = 0), call does not get interrupted by cancelling context.

Expected Behavior

Blocking function interrupts when context is cancelled

Current Behavior

Function continues to block after context cancellation

Possible Solution

Unsure yet

Steps to Reproduce

package main

import (
	"context"
	"fmt"
	"sync"
	"time"

	"github.com/go-redis/redis/v9"
	"github.com/google/uuid"
)

func main() {
	rdb := redis.NewUniversalClient(&redis.UniversalOptions{
		Addrs:    []string{"localhost:6379"},
		Password: "", // no password set
		DB:       0,  // use default DB
	})

	defer rdb.Close()

	ctx, cancelFn := context.WithCancel(context.Background())

	go func() {
		for idx := 0; idx < 5; idx++ {
			fmt.Printf("Waiting %v...\n", idx)
			time.Sleep(time.Second)
		}
		cancelFn()
		fmt.Printf("Cancelled context and now expect blocking XGroupCreateMkStream to be interrupted...\n")
	}()

	name := "blag"
	streamName := name
	groupName := name + "-blah"

	_, err := rdb.XGroupCreateMkStream(ctx, streamName, groupName, "0").Result()
	fmt.Printf("%v\n", err)

	var wg sync.WaitGroup

	wg.Add(1)
	go func() {
		defer wg.Done()
		objs, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
			Group:    groupName,
			Consumer: uuid.NewString(),
			Streams:  []string{streamName, ">"},
			Count:    100,
			Block:    0,
		}).Result()
		fmt.Printf("%v, %v\n", err, objs)
	}()

	wg.Wait()
	fmt.Printf("Done.\n")
}

Context (Environment)

I have two goroutines concurrently performing XREADGROUP and XADD in blocking mode. XADD is triggered by external events and is not guaranteed to add items to the stream at any particular cadence or pattern. Shutting down reading goroutine is not possible due to the blocking call that does not get interrupted by context concellation.

Detailed Description

Blocking calls should interrupt when context is cancelled and connection closed.

Possible Implementation

N/A

jgirtakovskis avatar Nov 05 '22 01:11 jgirtakovskis

Dapr maintainer here. I also need this fixed.

berndverst avatar Dec 02 '22 04:12 berndverst

I have same problem too. version: v9.0.0-rc.2

Alger7w avatar Dec 07 '22 04:12 Alger7w

The same problem, quite a bit blocker for me:(

latolukasz avatar Jan 04 '23 11:01 latolukasz

~~We've built a workaround for this using samber/lo, but agree that this should really be handled by the library level.~~

❗❗ Turns out this leaks a lot of goroutines and shouldn't be used. ❗❗

var cmd *redis.XStreamSliceCmd

select {
case cmd = <-lo.Async(func() *redis.XStreamSliceCmd {
	return c.Client.XReadGroup(ctx, &redis.XReadGroupArgs{
		Group:    "group",
		Consumer: "consumerID",
		Streams:  []string{"stream", ">"},
		Count:    1,
	})
}):
case <-ctx.Done():
	return ctx.Err()
}

streams, err := cmd.Result()

brettmorien avatar Feb 09 '23 01:02 brettmorien

Note that the workaround above will leak goroutines if you run the code repeatedly, so it's really only viable for handling app shutdown.

armsnyder avatar Feb 09 '23 02:02 armsnyder

I was able to contribute test cases in #2432. However I'm less confident in providing a fix. The code would need to safely dispose of the connection if the context is canceled, such as removing it from the connection pool. It would also need to not interfere with the expected behavior of ContextTimeoutEnabled (#2243).

armsnyder avatar Feb 09 '23 04:02 armsnyder

This feels complicated, but net.Conn is hard to be controlled by ctx. net.Conn uses Deadline instead of ctx...

var conn net.Conn // go-redis pool.Conn
ctx := context.Background()

processBlockCmd := func() <-chan *redis.XStreamSliceCmd {
	ch := make(chan *redis.XStreamSliceCmd)
	go func() {
		cmd := &redis.XStreamSliceCmd{}
		// write...
		if _, err := conn.Read(nil); err != nil {
			// check conn timeout?
			if err.Error() == "i/o timeout" && errors.Is(ctx.Err(), context.Canceled) {
				cmd.SetErr(err)
			}
		}
		ch <- cmd
		close(ch)
	}()
	return ch
}


select {
case cmd := <-processBlockCmd():
	return cmd
case <-ctx.Done():
	conn.SetDeadline(time.Now())
	return <-processBlockCmd()
}

monkey92t avatar Feb 09 '23 14:02 monkey92t

@monkey92t Right, deadlines on net.Conn are best used when you know the timeout ahead of time. The code you shared makes sense, and it is very similar to the change I just proposed in #2433 at a library level. I believe this belongs in the library since I would prefer the redis client to manage the connection for me. If users would prefer not to cancel redis commands with a context, then they can pass context.Background() to redis commands.

EDIT: One change between your code and mine is that you used SetDeadline whereas I closed the connection. Is there a meaningful difference there?

armsnyder avatar Feb 10 '23 01:02 armsnyder

@armsnyder We still need to think more. If goroutine is used every time a command is executed, it will have an impact on performance. I haven't thought of a good solution yet.

monkey92t avatar Feb 10 '23 05:02 monkey92t

Here's a benchstat comparing master with my PR.

https://gist.github.com/armsnyder/40aca6ea480bf53434d1e41c663e1550

We could optimize by running a goroutine per connection rather than per command. The connection goroutine would handle all I/O, with the command communicating to it over a channel.

armsnyder avatar Feb 10 '23 11:02 armsnyder

Hi folks (@monkey92t)!

I'm checking in to see if this is still on the radar. Not being able to cancel out of a blocking call is a deal breaker for gracefully shutting down our apps. We are currently operating on a July 2022 beta of this library until we can upgrade to a properly working release version.

brettmorien avatar Feb 21 '23 19:02 brettmorien

Hi folks (@monkey92t)!

I'm checking in to see if this is still on the radar. Not being able to cancel out of a blocking call is a deal breaker for gracefully shutting down our apps. We are currently operating on a July 2022 beta of this library until we can upgrade to a properly working release version.

Thank you for your attention....I've done related tests and it's hard to choose:

  1. Adding the chan+goroutine method (such as @armsnyder's example), we will pay a huge cost for each command executed.
  2. When <-ctx.Done(), we will close a network connection, because we can't trust the state of this connection, which will cause a chain reaction (#2046)

No matter how we do it, it will cause a lot of side effects because of context. I haven't found a better solution. A similar approach is also used in the net(*netFD) package.

I'm trying more solutions and benchmark tests, such as letting users choose whether to pay for listening to ctx, like #2243.

monkey92t avatar Feb 22 '23 07:02 monkey92t

Hi folks (@monkey92t)! I'm checking in to see if this is still on the radar. Not being able to cancel out of a blocking call is a deal breaker for gracefully shutting down our apps. We are currently operating on a July 2022 beta of this library until we can upgrade to a properly working release version.

Thank you for your attention....I've done related tests and it's hard to choose:

  1. Adding the chan+goroutine method (such as @armsnyder's example), we will pay a huge cost for each command executed.
  2. When <-ctx.Done(), we will close a network connection, because we can't trust the state of this connection, which will cause a chain reaction (Constantly Reestablishing Connections to AWS ElastiCache Redis in Cluster Mode (Continued) #2046)

No matter how we do it, it will cause a lot of side effects because of context. I haven't found a better solution. A similar approach is also used in the net(*netFD) package.

I'm trying more solutions and benchmark tests, such as letting users choose whether to pay for listening to ctx, like #2243.

With respect to (1) I am concerned about goroutines being leaked, or GC overhead. This would be a deal breaker for our use in Dapr (Distributed Application Runtime - github.com/dapr/dapr). We are very performance conscious as our project runs on a variety of targets included embedded systems.

I can't speak in favor of (2), but my vote is against (1).

berndverst avatar Feb 27 '23 19:02 berndverst

@berndverst What do you think of #2455 ?

monkey92t avatar Feb 27 '23 20:02 monkey92t

@berndverst What do you think of #2455 ?

Let me loop in one of my co-maintainers - @italypaleale thoughts on https://github.com/redis/go-redis/pull/2455 for addressing the issue discussed here?

berndverst avatar Feb 27 '23 21:02 berndverst

Hi folks. Any movement on this issue?

brettmorien avatar Mar 15 '23 18:03 brettmorien

This is a locker for us, could you please look at #2455? Thank you!

wk8 avatar Apr 27 '23 02:04 wk8

really need this fix, any update?

kkkbird avatar May 18 '23 08:05 kkkbird