concurrent-ruby
concurrent-ruby copied to clipboard
Concurrent::Promises::Channel deadlock in example code
I'm running example from Promises::Channel docs.
channel = Concurrent::Promises::Channel.new 2
log = Concurrent::Array.new # => []
def produce(channel, log, producer, i)
log.push format "producer %d pushing %d", producer, i
channel.push_op([producer, i]).then do
i + 1 < 4 ? produce(channel, log, producer, i + 1) : :done
end
end # => :produce
def consume(channel, log, consumer, i)
channel.pop_op.then(consumer, i) do |(from, message), consumer, i|
log.push format "consumer %d got %d. payload %d from producer %d",
consumer, i, message, from
do_stuff
i + 1 < 2 ? consume(channel, log, consumer, i + 1) : :done
end
end # => :consume
producers = Array.new 2 do |i|
Concurrent::Promises.future(channel, log, i) { |*args| produce *args, 0 }.run
end
consumers = Array.new 4 do |i|
Concurrent::Promises.future(channel, log, i) { |*args| consume *args, 0 }.run
end
producers.map(&:value!) # => [:done, :done]
consumers.map(&:value!) # => [:done, :done, :done, :done]
log
and it returns error:
<home>/.rbenv/versions/2.7.0/lib/ruby/gems/2.7.0/gems/concurrent-ruby-1.1.6/lib/concurrent-ruby/concurrent/promises.rb:775:in 'sleep': No live threads left. Deadlock? (fatal)
* Operating system: mac
* Ruby implementation: Ruby MRI 2.7.0
* `concurrent-ruby` version: 1.1.6
* `concurrent-ruby-ext` installed: yes
* `concurrent-ruby-edge` used: yes
It starts to work when the channel capacity is being increased to 4. Nonetheless it is still unclear to me why it has to be 4, or in essence, what's the underlying Channel behavior.
Edit:
After several days around concurrency topics I think the deadlock happens due to unfortunate coincidence. So it happens, because there is No live threads left which means that all of the threads are sleeping. The do_stuff default implementation has some sleep x internally, which causes consumers to pause. On the other hand, when the channel is full, the futures also wait for being processed so the deadlock happens, because there is no active thread. How I solved it currently was to add instead of last line:
Thread.new do
while true
sleep 0.2
puts log.unshift(log.length).join("\n")
end
end
There are of course other solutions.