em-synchrony
em-synchrony copied to clipboard
AMQP Subscribe
Presently the AMQP patch does not start a new fiber each time a message is received. This means that it is impossible to use em-synchrony inside a subscription callback. For example, suppose that I receive message A on fiber Z, and attempt to open a Redis connection. Well then em-synchrony will yield Z, waiting for Redis to open up its connection. But if we receive another message from AMQP, then AMQP will resume fiber Z with the new message, completely screwing up the Redis callback. I have a monkey patch for subscribe that seems to be working:
module EventMachine::Synchrony::AMQP
class Queue
def subscribe &block
asubscribe { |headers, payload| Fiber.new { block.call(headers, payload) }.resume }
end
end
end
The description is confusing me, but the code makes sense. :-)
@calj how does that look to you? any unexpected behaviors that we would need to guard for?
It's true it's not obvious to use the consumer part of the lib, we should probably improve it. I don't like the approach to create a new Fiber each time we receive a message, it sounds performance killer, we should use Fiber pools instead.
Let me show you the code I use in my consumers:
I consume messages in one Fiber only, and push them in a EM::Queue:
q = EM::Queue.new
consumer = EM::Synchrony::AMQP::Consumer.new(channel, queue, nil, false, true)
consumer.on_delivery do |meta, msg|
q.push(msg) if msg
end.consume
Then I create a Fiber poll that pop messages from the queue: (set the consumers_count as the parallelism level you need.)
(1..consumers_count).each do |n|
f = Fiber.new do
loop do
q.pop { |*args| EM.next_tick { f.resume *args } }
msg = Fiber.yield
# Do what you want with the message here
end
end
f.resume
end
Finally for the redis connections I use the EM:Synchrony::ConnectionPool to ensure there is not problem accessing them from different fibers:
EventMachine::Synchrony::ConnectionPool.new(size: 4) do
EM::Hiredis::Client.connect('localhost', 6379)
end
What do you think about this mechanism? We can include it in the library.
@calj I've tested the Fiber pool theory in the past, and to my own surprise, it didn't come out any faster.. I was playing with this within the core of Goliath, and if anything it came out a bit slower. Just FYI.. If we're going to recommend using fiber pools, we should benchmark the use case first.
The above is for the performance side strictly.. There are many other good reasons why you would use a fiber-pool...
I definitely agree with the benchmark approach. I will try to run some bench this week to get the answer.