components-contrib
components-contrib copied to clipboard
Synchronous replication for Redis pub/sub
Describe the proposal
This is a proposal to add an option to use synchronous replication with Redis Streams used as a Pub/Sub backend.
Why
While not transforming Redis into a reliable collection, providing a way to lower the chance of data loss at the cost of speed can make sense for some business-oriented scenarios.
##How Conditionally using the WAIT command pipelined with the already used XADD command.
func (r *redisStreams) Publish(req *pubsub.PublishRequest) error {
pipe := r.client.Pipeline()
pipe.XAdd(r.ctx, &redis.XAddArgs{
Stream: req.Topic,
MaxLenApprox: r.metadata.maxLenApprox,
Values: map[string]interface{}{"data": req.Data},
})
if r.metadata.syncReplicas > 0 {
pipe.Do(r.ctx, "WAIT", r.metadata.syncReplicas, r.metadata.syncTimeout.Milliseconds())
}
_, err := pipe.Exec(r.ctx)
if err != nil {
return fmt.Errorf("redis streams: error from publish: %s", err)
}
return nil
}
The pipeline will prevent 2 network round trips, and two components metadata options could be added to control the WAIT behaviour:
- one to control the number of synchronous replication required (syncReplicas in the example)
- another to set a WAIT timeout (syncTimeout in the example)
The number of synchronous replication required would default to 0, so there would be no changes in behaviour for existing deployments.
As I'm already fiddling with a PoC, I'm able to work on a PR.