concurrent-ruby icon indicating copy to clipboard operation
concurrent-ruby copied to clipboard

Concurrent::Promises::Channel deadlock in example code

Open ilyacherevkov opened this issue 5 years ago • 1 comments

I'm running example from Promises::Channel docs.

channel = Concurrent::Promises::Channel.new 2
log     = Concurrent::Array.new          # => []

def produce(channel, log, producer, i)
  log.push format "producer %d pushing %d", producer, i
  channel.push_op([producer, i]).then do
    i + 1 < 4 ? produce(channel, log, producer, i + 1) : :done
  end
end                                      # => :produce

def consume(channel, log, consumer, i)
  channel.pop_op.then(consumer, i) do |(from, message), consumer, i|
    log.push format "consumer %d got %d. payload %d from producer %d",
                    consumer, i, message, from
    do_stuff
    i + 1 < 2 ? consume(channel, log, consumer, i + 1) : :done
  end
end                                      # => :consume

producers = Array.new 2 do |i|
  Concurrent::Promises.future(channel, log, i) { |*args| produce *args, 0 }.run
end

consumers = Array.new 4 do |i|
  Concurrent::Promises.future(channel, log, i) { |*args| consume *args, 0 }.run
end

producers.map(&:value!)                  # => [:done, :done]
consumers.map(&:value!)                  # => [:done, :done, :done, :done]
log

and it returns error:

<home>/.rbenv/versions/2.7.0/lib/ruby/gems/2.7.0/gems/concurrent-ruby-1.1.6/lib/concurrent-ruby/concurrent/promises.rb:775:in 'sleep': No live threads left. Deadlock? (fatal)

* Operating system:                mac
* Ruby implementation:             Ruby MRI 2.7.0
* `concurrent-ruby` version:       1.1.6
* `concurrent-ruby-ext` installed: yes
* `concurrent-ruby-edge` used:     yes

ilyacherevkov avatar Mar 19 '20 09:03 ilyacherevkov

It starts to work when the channel capacity is being increased to 4. Nonetheless it is still unclear to me why it has to be 4, or in essence, what's the underlying Channel behavior.

Edit: After several days around concurrency topics I think the deadlock happens due to unfortunate coincidence. So it happens, because there is No live threads left which means that all of the threads are sleeping. The do_stuff default implementation has some sleep x internally, which causes consumers to pause. On the other hand, when the channel is full, the futures also wait for being processed so the deadlock happens, because there is no active thread. How I solved it currently was to add instead of last line:

Thread.new do
  while true
    sleep 0.2
    puts log.unshift(log.length).join("\n")
  end
end

There are of course other solutions.

blelump avatar Apr 18 '20 20:04 blelump