components-contrib icon indicating copy to clipboard operation
components-contrib copied to clipboard

Synchronous replication for Redis pub/sub

Open SoTrx opened this issue 2 years ago • 0 comments

Describe the proposal

This is a proposal to add an option to use synchronous replication with Redis Streams used as a Pub/Sub backend.

Why

While not transforming Redis into a reliable collection, providing a way to lower the chance of data loss at the cost of speed can make sense for some business-oriented scenarios.

##How Conditionally using the WAIT command pipelined with the already used XADD command.

func (r *redisStreams) Publish(req *pubsub.PublishRequest) error {
	pipe := r.client.Pipeline()
	pipe.XAdd(r.ctx, &redis.XAddArgs{
		Stream:       req.Topic,
		MaxLenApprox: r.metadata.maxLenApprox,
		Values:       map[string]interface{}{"data": req.Data},
	})
	if r.metadata.syncReplicas > 0 {
		pipe.Do(r.ctx, "WAIT", r.metadata.syncReplicas, r.metadata.syncTimeout.Milliseconds())
	}

	_, err := pipe.Exec(r.ctx)
	if err != nil {
		return fmt.Errorf("redis streams: error from publish: %s", err)
	}

	return nil
}

The pipeline will prevent 2 network round trips, and two components metadata options could be added to control the WAIT behaviour:

  • one to control the number of synchronous replication required (syncReplicas in the example)
  • another to set a WAIT timeout (syncTimeout in the example)

The number of synchronous replication required would default to 0, so there would be no changes in behaviour for existing deployments.

As I'm already fiddling with a PoC, I'm able to work on a PR.

SoTrx avatar Aug 31 '22 13:08 SoTrx