Thread pools behaving oddly with `max_queue` and `remaining_capacity`
We're in need for a thread pool that has a maximum number of parallel threads, no queue and fails if too many threads are added. We also need to be able to detect how many threads are "free" at a given time.
We've tried almost all of the ThreadPool and ThreadPoolExecutor classes but can't get it to work:
require 'concurrent'
pool = Concurrent::ThreadPoolExecutor.new(
min_threads: 0,
max_threads: 5,
max_queue: 0,
fallback_policy: :abort,
auto_terminate: false
)
6.times do
pool.post { sleep 5 }
puts pool.remaining_capacity
end
pool.wait_for_termination
If we interpret the documentation right, this should create a thread pool with 0 threads at start that can grow up to 5 threads which are handled in parallel. If all 5 threads are occupied, posting new work units should result in an exception. There should be no additional queue. We'd also expect remaining_capacity to drop by 1 each time something has been posted and the previous threads are still busy.
The output of the above script:
-1
-1
-1
-1
-1
-1
Expected would be:
4
3
2
1
0
=> EXCEPTION
Did we interpret the documentation wrong and how can we achieve what we need? We're also need to be able to detect the number of idle threads so that we can post the right number of work units to it.
On a side-note: The above example stops with a number of exceptions like the following:
/usr/local/lib/ruby/gems/2.4.0/gems/concurrent-ruby-1.0.5/lib/concurrent/synchronization/mri_lockable_object.rb:43:in `sleep': No live threads left. Deadlock? (fatal)
6 threads, 6 sleeps current:0x00007fa6936e27d0 main thread:0x00007fa6937044c0
* #<Thread:0x00007fa69487f0b8 sleep_forever>
rb_thread_t:0x00007fa6937044c0 native:0x00007fff7b53e000 int:1
/usr/local/lib/ruby/gems/2.4.0/gems/concurrent-ruby-1.0.5/lib/concurrent/synchronization/mri_lockable_object.rb:43:in `sleep'
/usr/local/lib/ruby/gems/2.4.0/gems/concurrent-ruby-1.0.5/lib/concurrent/synchronization/mri_lockable_object.rb:43:in `wait'
/usr/local/lib/ruby/gems/2.4.0/gems/concurrent-ruby-1.0.5/lib/concurrent/synchronization/mri_lockable_object.rb:43:in `ns_wait'
/usr/local/lib/ruby/gems/2.4.0/gems/concurrent-ruby-1.0.5/lib/concurrent/synchronization/abstract_lockable_object.rb:43:in `ns_wait_until'
/usr/local/lib/ruby/gems/2.4.0/gems/concurrent-ruby-1.0.5/lib/concurrent/atomic/event.rb:87:in `block in wait'
/usr/local/lib/ruby/gems/2.4.0/gems/concurrent-ruby-1.0.5/lib/concurrent/synchronization/mri_lockable_object.rb:38:in `block in synchronize'
/usr/local/lib/ruby/gems/2.4.0/gems/concurrent-ruby-1.0.5/lib/concurrent/synchronization/mri_lockable_object.rb:38:in `synchronize'
/usr/local/lib/ruby/gems/2.4.0/gems/concurrent-ruby-1.0.5/lib/concurrent/synchronization/mri_lockable_object.rb:38:in `synchronize'
/usr/local/lib/ruby/gems/2.4.0/gems/concurrent-ruby-1.0.5/lib/concurrent/atomic/event.rb:84:in `wait'
/usr/local/lib/ruby/gems/2.4.0/gems/concurrent-ruby-1.0.5/lib/concurrent/executor/ruby_executor_service.rb:49:in `wait_for_termination'
concurrent_test.rb:16:in `<main>'
I have currently no idea what this is about. But this second problem does not happen in our application and is not a dealbreaker.
Thanks a bunch for having a look at the first question though - this would help us a lot. Keep up your wonderful work!
- Operating system: mac
concurrent-rubyversion: 1.0.5concurrent-ruby-extinstalled: noconcurrent-ruby-edgeused: no
May I bump this question again? We'd really need a solution for this and are kind of stuck. Thanks a lot for having a quick look at this and sorry for bothering you again.
Sorry, I did not have free time left and could not attend concurrent-ruby for a while. Unfortunately the thread-pool uses max_queue: 0 to make the queue unbounded. Remaining_capacity returns -1 which indicates that the capacity of the queue is unbounded, it's not related to number of free threads. The pool can be queried for the number of the threads but not for the number of idle threads.
What's you use case for this requirement, if you don't mind me asking? I may be able to suggest alternative approach.
Thanks a lot for your answer. The reason for this requirement is the Gem workhorse. It has a (concurrent-ruby) queue of jobs and regularly polls the database for new jobs, only fetching as many jobs as the queue can still handle. So for instance, if we have a queue size of 5 and 3 jobs are active, the poller only fetches 2 jobs from the database.
To work around this issue, we've now created a wrapper around the ThreadPoolExecutor that does the job. If you feel it would make sense to include this in your Gem though we're more than happy to switch over to it.
@remofritzsche If you are looking to determine how many threads are available in the pool, you can apply the following patch to concurrent-ruby:
module Concurrent
class RubyThreadPoolExecutor < RubyExecutorService
def ready_worker_count
synchronize do
c1 = @max_length - @pool.length # Number of workers still to be created
c2 = @ready.length # Workers created but waiting
return c1+c2
end
end
end
end
Then just replace pool.remaining_capacity with pool.ready_worker_count in your example (and set max_queue to -1 instead of 0) and you will achieve the desired behaviour.
Great, thank you. Are there any plans of adding this to the official source? As we're developing a Gem, we do not want to patch other Gems if we can work around it. Thanks :)
I think the correct way to have no queue (e.g. run the fallback if all threads are active) is to use max_queue: 0, synchronous: true:
Concurrent::ThreadPoolExecutor.new(max_queue: 0, synchronous: true)`
You can see this behavior is asserted in the tests:
https://github.com/ruby-concurrency/concurrent-ruby/blob/9f40827be9a8a192a6993a8d157bd3ed0662ada0/spec/concurrent/executor/thread_pool_executor_shared.rb#L165
And this is the implementation in Ruby:
https://github.com/ruby-concurrency/concurrent-ruby/blob/9f40827be9a8a192a6993a8d157bd3ed0662ada0/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb#L208-L213