concurrent-ruby
concurrent-ruby copied to clipboard
Thread pool exception handling
I've read a code in {lib,spec}/concurrent/executor
a bit. It looks like people loves java way: the abstraction layer is ideal, documentation is great, tests are good and implementation is poor.
Here we have the implementation:
def run_task(pool, task, args)
task.call(*args)
pool.worker_task_completed
rescue => ex
# let it fail
log DEBUG, ex
rescue Exception => ex
log ERROR, ex
pool.worker_died(self)
throw :stop
end
We shouldn't catch Exception
. We should work only with StandardError
. System error should be able to kill this worker and all pool infrastructure.
For example: pool shouldn't be able to recreate workers if malloc failed in any worker, user should be able to interrupt ruby program with SIGINT and pool shouldn't stop him.
We could ignore user's StandardError
from task.call(*args)
, but we couldn't ignore any errors from pool.worker_task_completed
. pool.worker_task_completed
should left this trap.
I saw issue #616:
You'll notice that our API is exactly the same (in fact, on JRuby) our TPE is just a thin wrapper around Java's). The reason that neither Java's thread pools nor ours have exception handling within the thread pool is because that's not the appropriate place to put it.
@jdantonio, This planet are not rotating around java. The old java spec is not a holy thing, that couldn't be changed.
If you absolutely insist on posting jobs directly to a thread pool rather than using our high-level abstractions (which I strongly discourage) then just create a job wrapper. You can find examples of job wrappers in all our high-level abstractions, Rails ActiveJob, Sucker Punch, and other libraries which use our thread pools.
All of the high-level abstractions in this library (Promise, Actor, etc.) all post jobs to the global thread pool and all provide exception handling. Simply pick the abstraction that best fits your use case and use it.
People wanted to modify the default behaviour of thread pool. They didn't want to use your special high level job wrappers.
Ruby Thread
had the same issue, but people just fixed it without panic 12-15 years ago.
Here we can see Thread.abort_on_exception. It is false
by default and parent's thread will ignore any child's error. If it is true
child's errors will be re-raised in parent thread.
User is still able to create it's own job wrappers.
Some people (me too) loves this option because it makes api cleaner and code become more readable.
We can add :ABORT_ON_EXCEPTION => true/false
option into thread pool and this issue will be fixed properly.
@andrew-aladev Thank you for your thoughtful feedback. I've made my opinion clear and my opinion has not changed. However, the decision is no longer mine to make. @pitr-ch is now the maintainer. It is up to him. I will support whatever decision he makes and I will respectfully decline to comment further.
We can make abortable worker without any problems.
require "concurrent"
Concurrent::RubyThreadPoolExecutor.module_eval do
def ns_abort_execution aborted_worker
@pool.delete aborted_worker
ns_kill_execution
end
def abort_worker worker
synchronize { ns_abort_execution worker }
end
class AbortableWorker < self.const_get :Worker
def initialize pool
super
@thread.abort_on_exception = true
end
def run_task pool, task, args
begin
task.call *args
rescue StandardError => error
pool.abort_worker self
raise error
end
pool.worker_task_completed
nil
end
end
self.send :remove_const, :Worker
self.const_set :Worker, AbortableWorker
end
class MyError < StandardError; end
pool = Concurrent::FixedThreadPool.new 5
begin
pool.post do
sleep 1
puts "we shouldn't read this message"
end
pool.post do
puts "raising my error"
raise MyError
end
pool.shutdown
pool.wait_for_termination
rescue MyError
puts "received my error"
end
sleep 2
require "concurrent"
Concurrent::RubyThreadPoolExecutor.class_eval do
# Inspired by "ns_kill_execution".
def ns_abort_execution aborted_worker
@pool.each do |worker|
next if worker == aborted_worker
worker.kill
end
@pool = [aborted_worker]
@ready.clear
stopped_event.set
nil
end
def abort_worker worker
synchronize do
ns_abort_execution worker
end
nil
end
def join
shutdown
# We should wait for stopped event.
# We couldn't use timeout.
stopped_event.wait nil
@pool.each do |aborted_worker|
# Rubinius could receive an error from aborted thread's "join" only.
# MRI Ruby doesn't care about "join".
# It will receive error anyway.
# We can "raise" error in aborted thread and than "join" it from this thread.
# We can "join" aborted thread from this thread and than "raise" error in aborted thread.
# The order of "raise" and "join" is not important. We will receive target error anyway.
aborted_worker.join
end
@pool.clear
nil
end
class AbortableWorker < self.const_get :Worker
def initialize pool
super
@thread.abort_on_exception = true
end
def run_task pool, task, args
begin
task.call *args
rescue StandardError => error
pool.abort_worker self
raise error
end
pool.worker_task_completed
nil
end
def join
@thread.join
nil
end
end
self.send :remove_const, :Worker
self.const_set :Worker, AbortableWorker
end
class MyError < StandardError; end
pool = Concurrent::FixedThreadPool.new 5
begin
pool.post do
sleep 1
puts "we shouldn't receive this message"
end
pool.post do
puts "raising my error"
raise MyError
end
pool.join
rescue MyError => error
puts "received my error, trace: \n#{error.backtrace.join("\n")}"
end
sleep 2
This patch works fine for any version of MRI Ruby and Rubinius. I am not interested in JRuby support. It should be easy to fix JRuby executor in the same way. Please fix if you want.
The current implementation indeed swallows the exception, which is not good. Unfortunately I cannot accept your contribution as is. Could you open a pull-request? it will make perfectly clear what is the change in the code. Please include a simple reproducer for the problem you are describing, or even better tests. If you are not interested in JRuby that's fine I can evolve your pull-request and do necessary changes there. I cannot accept anything which will change only MRI implementation.
Sure, I will provide a pull request. I am very busy this week, I will provide it later.
Thanks, that's fine.
Postponing to 1.2 since it's not easily fixable since JRuby thread-pool is not using Ruby threads, the tasks probably need wrapping and raise on main thread or something similar.
Another way is:
errors = Concurrent::Array.new
pool = Concurrent::FixedThreadPool.new 5
futures = configs.map do |c|
Concurrent::Promises.future { run_process(c) }.rescue { |e| errors << e }
end
Concurrent::Promises.zip_futures_on(pool, *futures).value
errors.each { |e| STDERR.puts e.message }
@merqlove yeah that works thanks for the comment, the array
should be a Concurrent::Array
instance though. I've updated your comment.
@pitr-ch did you guys end up with something? With the latest version thread pool still swallows exceptions
@route I've put this on my list to revisit. Thanks for your interest.
yes, when i use concurrent-ruby, i found it not work, even if add following config:
Thread.abort_on_exception = true
Because no any exception is raise actually.
@pitr-ch I can work on this if you are short of hands and agree to merge it in one shape or another
@route thanks a lot! please have a look and open a PR with a prototype where we could discuss the desired change. Things to keen in mind:
- The final change (not the prototype) should be backward compatible, we'll probably be forced to keep the current behavior by default and introduce the fixed one with an option. However we can print a warning when the default is used to let users switch manually to the fixed not swallowing behavior of the pool.
- The behavior should be the same on MRI and JRuby.
I've send you an invitation to become a collaborator, if you accept I can assign you the issue.
Swallowing an exception is really problematic. Any plans to tackle that? There even seems to be a ready to use solution, at least for the 2017 code...
Another way is:
errors = Concurrent::Array.new pool = Concurrent::FixedThreadPool.new 5 futures = configs.map do |c| Concurrent::Promises.future { run_process(c) }.rescue { |e| errors << e } end Concurrent::Promises.zip_futures_on(pool, *futures).value errors.each { |e| STDERR.puts e.message }
This forces to create all tasks ahead
It would be good to have some strategy for handling exceptions. For example some :fail_fast
strategy for cases when you need shutdown execution on the first acurred error.