em-synchrony
                                
                                
                                
                                    em-synchrony copied to clipboard
                            
                            
                            
                        FiberIterator not passing iterator to block, "call 'next' on nil" exception
I'm using a FiberIterator to iterate over a list of redis clients, collecting the hashes that are returned by injection —
EM::Synchrony::FiberIterator.new(client_ops(fields), @concurrency).inject({}) do |rh, co, iter|
  iter.return rh.merge!(co[:client].mapped_hmget(@key, *co[:ops]))
end
For the iteration, each works great. However, interation using inject produces the following exception:
[...]/gems/eventmachine-1.0.0/lib/em/iterator.rb:197:in `block (2 levels) in inject': undefined method `next' for nil:NilClass (NoMethodError)
It seems that FiberIterator's each method is not passing an interator into the foreach/blk proc. The implementation of inject in Eventmachine expects a non-nil iterator on which it can call next.
If I knew what the intended behavior was supposed to be, I would have submitted a patch. A temporary, ulgy and wrong fix is the following code:
module EventMachine
  module Synchrony
    class NextConsumer
      def next
      end
    end
    class FiberIterator < EM::Synchrony::Iterator
      # execute each iterator block within its own fiber
      # and auto-advance the iterator after each call
      def each(foreach=nil, after=nil, &blk)
        fe = Proc.new do |obj, iter|
          Fiber.new { (foreach || blk).call(obj, NextConsumer.new); iter.next }.resume
        end
        super(fe, after)
      end
    end
  end
end
                                    
                                    
                                    
                                
Ah, yes.. I don't think I ever got to inject, hence the commented out spec: https://github.com/igrigorik/em-synchrony/blob/master/spec/fiber_iterator_spec.rb#L22
I'd start with getting a failed spec, and then we can iterate from there!
Got it. I'll investigate a version of inject that will play nicely against that spec...
But I'm having some difficulty with the spec as defined. We're going to add the numbers 1 to 5 in order, two fibers at a time. Each of the two fibers will start with the current tally, add their number, and call return on the iterator with the sum, updating the tally. Only one of the fibers will win this race condition, unless we introduce sequencing within return. I expect to see something like:
[:sync, 0, 1]
[:sync, 0, 2]
[:sync, 2, 3]
[:sync, 2, 4]
[:sync, 6, 5]
Failures:
  1) EventMachine::Synchrony::FiberIterator should sum values within the iterator
     Failure/Error: res.should == data.inject(:+)
       expected: 15
            got: 11 (using ==)
If I modify FiberIterator.each to not auto-advance and run the spec as given, this is indeed the result.
To take this from another perspective, if I introduce some persistent object that's carried through the process and accessible to all fibers, I get a natural ordering of updates. For example, here's the same spec with a hash instead of an integer:
it "should sum values within the iterator" do
  EM.synchrony do
    data = (1..5).to_a
    res = EM::Synchrony::FiberIterator.new(data, 2).inject({:total => 0}) do |total_hash, num, iter|
      EM::Synchrony.sleep(0.1)
      p [:sync, total_hash[:total], num]
      total_hash[:total] += num
      iter.return(total_hash)
    end
    res[:total].should == data.inject(:+)
    EventMachine.stop
  end
end
And the output is as expected:
[:sync, 0, 1]
[:sync, 1, 2]
[:sync, 3, 3]
[:sync, 6, 4]
[:sync, 10, 5]
I'm interested to know what you think and in which direction you think I should proceed. It seems like its not possible to carry the semantics of inject over to this concurrent world without requiring the use of some tricks to avoid race conditions. Now, I'm very new to this world of fibers and Eventmachine so I could be wrong. If I seem so certain, forgive me. I'm really not. :)
Humm.. yeah, this is a tricky one:
require 'eventmachine'
EM.run do
  EM::Iterator.new((1..5), 2).inject(0, proc{ |total,num,iter|
    EM.next_tick {
      p [total, num]
      total += num
      iter.return(total)
    }
  }, proc{ |results|
    p results
    EM.stop
  })
end
EM.run do
  total = []
  EM::Iterator.new((1..5), 2).inject(total, proc{ |total,num,iter|
    EM.next_tick {
      p [total, num]
      total.push num
      iter.return(total)
    }
  }, proc{ |results|
    p results.inject(:+)
    EM.stop
  })
end
Yields
[0, 1]
[1, 2]
[1, 3]
[4, 4]
[4, 5]
9
[[], 1]
[[1], 2]
[[1, 2], 3]
[[1, 2, 3], 4]
[[1, 2, 3, 4], 5]
15
So the behavior is consistent with EM's iterator implementation. I'm not really sure there is a clean solution here..
@tmm1 any suggestions?
Very interesting. I'll probably go with a map-reduce style implementation, as you give in your second example. Thanks!
We still need to fix the .next bug, but other then that.. Probably worth checking-in the spec + some docs on the gotcha's of using it between multiple fibers. :)
any new love to be spread here? :) I was trying something with "map", and got a similar error, after trying to return with "iter.return"