radix icon indicating copy to clipboard operation
radix copied to clipboard

v3: 2 separate connections for issuing regular commands and persistentpubsub ?

Open yash-nisar opened this issue 9 months ago • 1 comments

Hi, we have a use case where we're Redis to issue regular commands for rate limiting and listening to a channel for expired key events. Currently, we are creating 2 connections.

I wanted to ask if the same connection can be re-used ? What is the recommendation ?

I also saw that if the same connection is re-used, it will anyway create a new connection ?

p := &persistentPubSub{
  dial:    func() (Conn, error) { return opts.connFn(network, addr) },
  opts:    opts,
  subs:    chanSet{},
  psubs:   chanSet{},
  cmdCh:   make(chan pubSubCmd),
  closeCh: make(chan struct{}),
}

I'm asking because we're trying to use entra id authentication which will force us to send AUTH periodically before the token expires else the connection would be terminated by Azure Redis.

@mediocregopher Any thoughts ?

yash-nisar avatar May 13 '24 17:05 yash-nisar

Hi @yash-nisar !

I wanted to ask if the same connection can be re-used ? What is the recommendation ?

Unfortunately no, the connection within PubSubConns can't be extracted in any way.

I also saw that if the same connection is re-used, it will anyway create a new connection ?

I'm not sure what you mean by this... if you're referring to the dial field, that is only called if there's been an error in the underlying connection and persistent pub sub conn is creating a new connection.

send AUTH periodically before the token expires else the connection would be terminated by Azure Redis. There is certainly no nice way to do this. I can offer a possible hacky solution, which I haven't tested (and can't test, cause I don't have an Azure account). If it doesn't work then hopefully it at least displays the theory:

type hackyConn struct {
    radix.Conn
    l sync.Mutex
    stopCh chan struct{}
}

// newHackyConn is a radix.ConnFunc
func newHackyConn(network, addr string) (radix.Conn, error) {
    inner, err := radix.Dial(network, addr)
    if err != nil {
        return nil, err
    }

    conn := &hackyConn{
        Conn: inner,
        stopCh: make(chan struct{}),
    }

    go func() {
        t := time.NewTicker(1 * time.Minute)
        defer t.Stop()

        for {
            select {
            case <-t.C:
                err := conn.Encode(radix.Pipeline(
                    radix.Cmd(nil, "CLIENT", "REPLY", "SKIP"),
                    radix.Cmd(nil, "AUTH", "whatever"),
                ))
                if err != nil {
                    // TODO HANDLE ERROR, log or something
                }
            case <-conn.stopCh:
                return
            }
        }
    }()

    return conn, nil
}

func (c *hackyConn) Encode(m resp.Marshaler) error {
    c.l.Lock()
    defer c.l.Unlock()
    return c.inner.Encode(m)
}

func (c *hackyConn) Close() error {
    close(c.stopCh)
    return c.inner.Close()
}

You would then pass newHackyConn into the persistent pub sub conn via PersistentPubSubConnFunc.

The lock around Encode is necessary because the PubSubConn may also be trying to use Encode to write a Ping message at the same time, and that would cause a panic. Luckily PubSubConn never uses Do, or I don't think this solution would be possible.

The CLIENT REPLY SKIP might not be necessary, but it should at least not hurt anything.

Hopefully this helps! I'd be curious if it works, if not I might be able to help more but it's also possible this just can't be done.

mediocregopher avatar May 15 '24 17:05 mediocregopher

Thanks @mediocregopher !

yash-nisar avatar Aug 27 '24 20:08 yash-nisar