rueidis icon indicating copy to clipboard operation
rueidis copied to clipboard

No messages from SUBSCRIBE __redis__:invalidate

Open boekkooi-impossiblecloud opened this issue 4 weeks ago • 3 comments

Good day,

At the moment I'm busy migrating to rueidis and I'm learning a lot about Redis in the process.

Now I'm trying to move away from keyevents and use client tracking sadly I hit a little snag.

I was trying to subscribe to __redis__:invalidate using CoreClient.Receive but sadly this seems to not be supported as the pipe.handlePush is intercepting the messages.

Now I can work around this by creating another Client with the onInvalidations and then using client.Receive(ctx,client.B().Subscribe().Channel("__redis__:invalidate").Build(),func(msg rueidis.PubSubMessage) {}) so it's not a mayor issue but more of a bummer.

Normally I would try to create a PR but I'm honestly not sure what the right solution would be and I'm not very familiar with this library.

Thanks of maintaining this library and checking out the issue! Cheers, Warnar

Example

func TestExample(t *testing.T) {
	const prefix = "key"
	var redisClient rueidis.Client
	redisClient, _ = createRedisClient(t)

	expectedChangeCount := 10
	go func() {
		changes := 0
		err := redisClient.Dedicated(func(client rueidis.DedicatedClient) error {
			clientTrackingCmd := client.B().ClientTracking().On().Prefix().Prefix(prefix).Bcast().Build()
			err := client.Do(t.Context(), clientTrackingCmd).Error()
			if err != nil {
				return err
			}

			subscribeCmd := client.B().Subscribe().Channel(`__redis__:invalidate`).Build()
			return client.Receive(t.Context(),
				subscribeCmd,
				func(msg rueidis.PubSubMessage) {
					changes++
					t.Log(msg.Message)
				},
			)
		})
		if err != nil && !errors.Is(err, context.Canceled) {
			t.Error(err)
		}
		if changes != expectedChangeCount {
			t.Errorf("expected changes to be %d, got %d", expectedChangeCount, changes)
		}
	}()
	time.Sleep(time.Millisecond * 100)

	for i := 0; i < expectedChangeCount; i++ {
		err := redisClient.Do(t.Context(), redisClient.B().Set().Key(prefix+"1").Value("v"+strconv.Itoa(i)).Build()).Error()
		if err != nil {
			t.Error(err)
		}
	}
	time.Sleep(time.Millisecond * 100)
}

Workaround

func TestWorkaround(t *testing.T) {
	const prefix = "key"
	var (
		redisClient        rueidis.Client
		redisClientOptions rueidis.ClientOption
	)
	redisClient, redisClientOptions = createRedisClient(t)

	expectedChangeCount := 10
	go func() {
		changes := 0
		redisClientOptions.OnInvalidations = func(messages []rueidis.RedisMessage) {
			for _, message := range messages {
				changes++
				t.Log(message.ToString())
			}
		}

		workaroundClient, err := rueidis.NewClient(redisClientOptions)
		if err != nil {
			t.Error(err)
			return
		}

		err = workaroundClient.Dedicated(func(client rueidis.DedicatedClient) error {
			clientTrackingCmd := client.B().ClientTracking().On().Prefix().Prefix(prefix).Bcast().Build()
			err := client.Do(t.Context(), clientTrackingCmd).Error()
			if err != nil {
				return err
			}

			subscribeCmd := client.B().Subscribe().Channel(`__redis__:invalidate`).Build()
			return client.Receive(t.Context(),
				subscribeCmd,
				func(msg rueidis.PubSubMessage) {
					// Using OnInvalidations
				},
			)
		})
		if err != nil && !errors.Is(err, context.Canceled) {
			t.Error(err)
		}
		if changes != expectedChangeCount {
			t.Errorf("expected changes to be %d, got %d", expectedChangeCount, changes)
		}
	}()
	time.Sleep(time.Millisecond * 100)

	for i := 0; i < expectedChangeCount; i++ {
		err := redisClient.Do(t.Context(), redisClient.B().Set().Key(prefix+"1").Value("v"+strconv.Itoa(i)).Build()).Error()
		if err != nil {
			t.Error(err)
		}
	}
	time.Sleep(time.Millisecond * 100)
}

Hi @boekkooi-impossiblecloud, thanks for the request.

I think your workaround is generally correct while subscribing to __redis__:invalidate is not necessary and creating a new client looks awkward.

The __redis__:invalidate pubsub channel is only available when using the Two connections mode. And it is not needed if you are using RESP3.

So, I think what we lack here is a SetOnInvalidations method on the DedicatedClient interface, similar to the existing SetPubSubHooks. In other words, if we have the SetOnInvalidations, we should be able to do the following easily:

	client, err := rueidis.NewClient(rueidis.ClientOption{
		InitAddress: []string{"127.0.0.1:6379"},
	})

	...

	invalidations := make(chan []rueidis.RedisMessage, 1000)

	...

	go func() {
		dc, cancel := client.Dedicate()
		defer cancel()
		ch := dc.SetOnInvalidations(func(messages []rueidis.RedisMessage) {
			invalidations <- messages
		})
		if err := dc.Do(context.Background(), dc.B().ClientTracking().On().Prefix().Prefix(prefix).Bcast().Build()).Error(); err != nil {
			panic(err)
		}
		<-ch
	}()

Would you like to try implementing this? I think it will require the following:

  1. Add SetOnInvalidations method on the DedicatedClient interface.
  2. Implement the method on all DedicatedClient implementations using a similar way of SetPubSubHooks.
  3. Unset the tracking state before the dedicated connection is returned to the pool.

Those are actually a lot of work. I think another simpler workaround is setting OnInvalidations on the top-level client so that you don't need to recreate a new client in the goroutine.

rueian avatar Dec 11 '25 18:12 rueian

Hi @rueian

Thanks a lot for the fast retry and all the info! I agree that using the clients OnInvalidations is preferred but it is a bit limiting for my specific use case.

The idea of a SetOnInvalidations is good but I do wonder if extending PubSubHooks with an OnInvalidate function would not be an easier and good enough solution in this case. It would bring the disadvantage that a user must manually turn off client tracking before returning the dedicated client to the pool but as I expect this use case to be more of an edge case scenario this should be fine in my opinion.

Would do you think/prefer?

Extending the PubSubHooks struct is a great idea. It is indeed easier to implement. However, I think the extension should be done privately, and we should still provide SetOnInvalidations as the public interface since we are not using the Two connections mode. In other words, the implementation should look like this:

type PubSubHooks struct {
    ...
    onInvalidations func([]rueidis.RedisMessage) // a private extension
}

func (c *dedicatedSingleClient) SetOnInvalidations(fn func([]rueidis.RedisMessage)) <-chan error {
    return c.SetPubSubHooks(PubSubHooks{onInvalidations: fn})
}

It would bring the disadvantage that a user must manually turn off client tracking before returning the dedicated client to the pool

I think we should automatically turn off client tracking. It can be done in the CleanSubscriptions.

rueian avatar Dec 12 '25 10:12 rueian

@rueian if its open for external contributors, can i work on it!

AryanBagade avatar Dec 20 '25 01:12 AryanBagade

@rueian if its open for external contributors, can i work on it!

Yes, we are always open for contributors. Thanks!

rueian avatar Dec 20 '25 02:12 rueian