redis-rb icon indicating copy to clipboard operation
redis-rb copied to clipboard

Poor's man multiplexing (deferred reading of replies)

Open antirez opened this issue 13 years ago • 2 comments

Hello redis-rb hackers!

In this feature request I propose the introduction of an optional interface, not replacing the normal interface, but only as an additional interface, that disjoins the calls to send commands and read replies.

The main rationale for this feature is to query N redis servers simultaneously without resorting to explicit multiplexing or any proper non-blocking framework.

Feature description and assumptions

In networks where there are N Redis servers R0, R1, R2, ... Rn-1, assuming a comparable latency between a client C and any of the Redis servers, a deferred read allows to mount a pattern like the follo(r2, r3 are three Redis-rb objects

r1.deferred {
    reply1 = r1.set('foo','bar')
}
r2.deferred {
    reply2 = r2.set('foo','bar')
}
r3.deferred {
    reply3 = r3.set('foo',"bar')
}

reply1.read()
reply2.read()
reply3.read()

Note: the interface is just an example

The commands sent to r1, r2, r3 will leave the client about at the same time (as a TCP packet sent to the server). Because of the assumption of similar latency between C and R0, R1, R2, the replies will be available in the three sockets about at the same time, and the last two .read() calls will likely take minimal time to execute not paying the round time trip that will be instead payed only the first time.

Applications

This feature will be useful every time a client needs to perform operations against multiple Redis servers in parallel. Two notable applications are:

  • Client-side implementation of high availability systems where data must be written in multiple nodes.
  • Optimizations of queries where data is stored across multiple nodes. For instance an interface for Redis Cluster could implement a special interface to perform queries in parallel if possible, in case data is stored in different nodes.

Thanks, Salvatore

antirez avatar Nov 03 '12 18:11 antirez

Hi, @antirez! I wrote a draft code for yor proposal as follows, but I do not know if I could understand your purpose correctly. So, I would appriciate it if you could give me any comments. If this meets your requirement, I would prepare for a pull request. Otherwise, I would be pleased if you kindly took me in the right direction.

#!/usr/bin/env ruby
# encoding: UTF-8
require 'redis'
require 'benchmark'

class Redis
  def deferred(tag=nil, &block)
    synchronize do |client|
      Deferred.new(client, block, tag)
    end
  end

  class Deferred
    def initialize(client, block, tag)
      Deferred.callbacks[self] = {
        client: client,
        block: block,
        tag: tag
      }
    end

    def read
      Deferred.resolve(self.tag) unless Deferred.callbacks.empty?
      reply = Deferred.replies.delete(self)
      raise reply if reply.kind_of?(::RuntimeError)
      reply
    end

    def tag
      deferred = Deferred.callbacks[self]
      return nil if deferred.nil?
      deferred[:tag]
    end

    def self.callbacks
      @callbacks ||= {}
    end

    def self.replies
      @replies ||= {}
    end

    def self.resolve(tag)
      threads = []
      Deferred.callbacks.each do |deferred, callback|
        if deferred.tag == tag
          threads << Thread.new(callback) do |cb|
            reply = begin
                      cb[:block].call(cb[:client])
                    rescue RuntimeError => err
                      err
                    end
            Deferred.replies[deferred] = reply
          end
        end
      end
      threads.each {|thread| thread.join}
      Deferred.callbacks.delete_if {|deferred| deferred.tag == tag}
    end
  end
end

# Redis#deferred is used as follows.

ports = [6379, 6380, 6381]
r = ports.map {|port| Redis.new(port: port)}

args = {}
100000.times do |i|
  args["foo#{i}"] = "foo#{i}"
  args["bar#{i}"] = "bar#{i}"
  args["baz#{i}"] = "baz#{i}"
end

@not_deferred_replies = nil
@deferred_replies = nil

Benchmark.bmbm do |benchmark|
  benchmark.report("Not deferred: ") do
    reply0 = r[0].pipelined do
      r[0].mapped_mset(args)
      r[0].del(args.keys)
    end
    reply1 = r[1].pipelined do
      r[1].mapped_mset(args)
      r[1].del(args.keys)
    end
    reply2 = r[2].pipelined do
      r[2].mapped_mset(args)
      r[2].del(args.keys)
    end
    @not_deferred_replies = [reply0, reply1, reply2]
  end

  benchmark.report("Deferred: ") do
    deferred0 = r[0].deferred do
      r[0].pipelined do
        r[0].mapped_mset(args)
        r[0].del(args.keys)
      end
    end
    deferred1 = r[1].deferred do
      r[1].pipelined do
        r[1].mapped_mset(args)
        r[1].del(args.keys)
      end
    end
    deferred2 = r[2].deferred do
      r[2].pipelined do
        r[2].mapped_mset(args)
        r[2].del(args.keys)
      end
    end
    reply0 = deferred0.read()
    reply1 = deferred1.read()
    reply2 = deferred2.read()
    @deferred_replies = [reply0, reply1, reply2]
  end
end

puts "Not deferred:    #{@not_deferred_replies.inspect}"
puts "Deferred:        #{@deferred_replies.inspect}"

bitterb avatar Dec 05 '12 15:12 bitterb

This is way too old, and apologies for not addressing before.

Wouldn't this use case be more easily handled by threads?

djanowski avatar Nov 18 '13 16:11 djanowski