redis-rb
redis-rb copied to clipboard
Poor's man multiplexing (deferred reading of replies)
Hello redis-rb hackers!
In this feature request I propose the introduction of an optional interface, not replacing the normal interface, but only as an additional interface, that disjoins the calls to send commands and read replies.
The main rationale for this feature is to query N redis servers simultaneously without resorting to explicit multiplexing or any proper non-blocking framework.
Feature description and assumptions
In networks where there are N Redis servers R0, R1, R2, ... Rn-1, assuming a comparable latency between a client C and any of the Redis servers, a deferred read allows to mount a pattern like the follo(r2, r3 are three Redis-rb objects
r1.deferred {
reply1 = r1.set('foo','bar')
}
r2.deferred {
reply2 = r2.set('foo','bar')
}
r3.deferred {
reply3 = r3.set('foo',"bar')
}
reply1.read()
reply2.read()
reply3.read()
Note: the interface is just an example
The commands sent to r1, r2, r3 will leave the client about at the same time (as a TCP packet sent to the server). Because of the assumption of similar latency between C and R0, R1, R2, the replies will be available in the three sockets about at the same time, and the last two .read() calls will likely take minimal time to execute not paying the round time trip that will be instead payed only the first time.
Applications
This feature will be useful every time a client needs to perform operations against multiple Redis servers in parallel. Two notable applications are:
- Client-side implementation of high availability systems where data must be written in multiple nodes.
- Optimizations of queries where data is stored across multiple nodes. For instance an interface for Redis Cluster could implement a special interface to perform queries in parallel if possible, in case data is stored in different nodes.
Thanks, Salvatore
Hi, @antirez! I wrote a draft code for yor proposal as follows, but I do not know if I could understand your purpose correctly. So, I would appriciate it if you could give me any comments. If this meets your requirement, I would prepare for a pull request. Otherwise, I would be pleased if you kindly took me in the right direction.
#!/usr/bin/env ruby
# encoding: UTF-8
require 'redis'
require 'benchmark'
class Redis
def deferred(tag=nil, &block)
synchronize do |client|
Deferred.new(client, block, tag)
end
end
class Deferred
def initialize(client, block, tag)
Deferred.callbacks[self] = {
client: client,
block: block,
tag: tag
}
end
def read
Deferred.resolve(self.tag) unless Deferred.callbacks.empty?
reply = Deferred.replies.delete(self)
raise reply if reply.kind_of?(::RuntimeError)
reply
end
def tag
deferred = Deferred.callbacks[self]
return nil if deferred.nil?
deferred[:tag]
end
def self.callbacks
@callbacks ||= {}
end
def self.replies
@replies ||= {}
end
def self.resolve(tag)
threads = []
Deferred.callbacks.each do |deferred, callback|
if deferred.tag == tag
threads << Thread.new(callback) do |cb|
reply = begin
cb[:block].call(cb[:client])
rescue RuntimeError => err
err
end
Deferred.replies[deferred] = reply
end
end
end
threads.each {|thread| thread.join}
Deferred.callbacks.delete_if {|deferred| deferred.tag == tag}
end
end
end
# Redis#deferred is used as follows.
ports = [6379, 6380, 6381]
r = ports.map {|port| Redis.new(port: port)}
args = {}
100000.times do |i|
args["foo#{i}"] = "foo#{i}"
args["bar#{i}"] = "bar#{i}"
args["baz#{i}"] = "baz#{i}"
end
@not_deferred_replies = nil
@deferred_replies = nil
Benchmark.bmbm do |benchmark|
benchmark.report("Not deferred: ") do
reply0 = r[0].pipelined do
r[0].mapped_mset(args)
r[0].del(args.keys)
end
reply1 = r[1].pipelined do
r[1].mapped_mset(args)
r[1].del(args.keys)
end
reply2 = r[2].pipelined do
r[2].mapped_mset(args)
r[2].del(args.keys)
end
@not_deferred_replies = [reply0, reply1, reply2]
end
benchmark.report("Deferred: ") do
deferred0 = r[0].deferred do
r[0].pipelined do
r[0].mapped_mset(args)
r[0].del(args.keys)
end
end
deferred1 = r[1].deferred do
r[1].pipelined do
r[1].mapped_mset(args)
r[1].del(args.keys)
end
end
deferred2 = r[2].deferred do
r[2].pipelined do
r[2].mapped_mset(args)
r[2].del(args.keys)
end
end
reply0 = deferred0.read()
reply1 = deferred1.read()
reply2 = deferred2.read()
@deferred_replies = [reply0, reply1, reply2]
end
end
puts "Not deferred: #{@not_deferred_replies.inspect}"
puts "Deferred: #{@deferred_replies.inspect}"
This is way too old, and apologies for not addressing before.
Wouldn't this use case be more easily handled by threads?