concurrent-ruby
concurrent-ruby copied to clipboard
No good tools for long streams of tasks
A couple of times now I've been speaking to someone and they have a problem like this.
pool = Concurrent::FixedThreadPool.new(Concurrent.processor_count)
1.upto 2000000 do |i|
pool.post do
# work here
end
end
or
File.open('foo').each do |line|
pool.post do
# work here
end
end
In both cases we're creating a huge number of tasks, and in the latter case we may not know how many tasks in advance.
The problem in both cases is that people create millions of tasks which can take up a lot of memory with the proc and the closure.
I feel like we're missing two abstractions here.
The first is a basic parallel #each
on a Enumerable
with a length
. We don't have that do we? It would need chunking (run n
tasks in each task), and perhaps automatic chunking based on profiling (run 1 task, see how long it takes, think about how many tasks there are and set n
based on that).
The second is something similar that works on an Enumerator
, which doesn't have a length. Here chunking is harder as we don't know how many tasks there will be in advance. We may need some kind of work stealing here.
I'd like to write the two examples above as:
1.upto(2000000).parallel_each(pool) do
# work here
end
or
File.open('foo').each.parallel_each(pool) do |line|
# work here
end
In both cases we'd only create as many tasks at a time as was reasonable (if the pool has n
threads it may be kn
tasks for some small constant k
).
A workaround in the mean time to just stop so many tasks being created and memory being blown may be to do this (ping @digininja this is relevant to you):
pool = Concurrent::FixedThreadPool.new(
Concurrent.processor_count,
max_queue: 10 * Concurrent.processor_count,
fallback_policy: :caller_runs)
1.upto 2000000 do |i|
pool.post do
# work here
end
end
This will only create up to 10 times as many tasks as you have cores, with any other tasks being run immediately instead of being added to the pool. This means if you already have say 40 tasks in the pool, instead of creating a new task it will be run, and then by the time the loop gets around to finishing that the pool may be ready for new tasks, or may not be in which case the main thread runs that new task as well.
Oh when you are doing this you may want to set the thread pool to size Concurrent.processor_count - 1
, as the main thread will also be running tasks, so you would have Concurrent.processor_count + 1
tasks running and being switched on and off cores, ruining cache.
Sounds like a good option, I'll give it a try tonight. On 9 Jan 2016 12:50, "Chris Seaton" [email protected] wrote:
Oh when you are doing this you may want to set the thread pool to size Concurrent.processor_count
- 1, as the main thread will also be running tasks, so you would have Concurrent.processor_count
- 1 tasks running and being switched on and off cores, ruining cache.
— Reply to this email directly or view it on GitHub https://github.com/ruby-concurrency/concurrent-ruby/issues/493#issuecomment-170237496 .
This exists in many gems as peach
/pmap
, but I think it would make total sense in c-r.
It sounds like Scala's .par
to me, that might be an interesting direction to explore.
It could indeed be nice to have parallel enumerators in additions to eager and lazy ones :)
Have you got good links to describe the basics of how all this works, I.e. what are the different types of enumerators? On 9 Jan 2016 15:15, "Benoit Daloze" [email protected] wrote:
This exists in many gems as peach/pmap, but I think it would make total sense in c-r. It sounds like Scala's .par http://docs.scala-lang.org/overviews/parallel-collections/overview.html to me, that might be an interesting direction to explore. It could indeed be nice to have parallel enumerators in additions to eager and lazy ones :)
— Reply to this email directly or view it on GitHub https://github.com/ruby-concurrency/concurrent-ruby/issues/493#issuecomment-170250964 .
:+1: to parallel enumerable.
I'm heading out the door and only have time for a quick note. This topic has been discussed at length before and some spike code was written. Please see #222, #229, and #231.
// @SebastianEdwards
Ah I didn't know about those. We need a parallel Enumerator
as well as Enumerable
though, for streams of unknown length. I didn't see that discussed yet.
For my real code, and my sample that @chrisseaton reproduced above, the change to the pool initialisation worked perfectly. Memory doesn't go above 2.1% on my 16G machine so I've pushed the queue size up to 90 times and still only at 2.3% max.
An example with actors, which also has configurable parallelism, does not require to manage pool directly.
require 'concurrent-edge'
def work(i)
sleep rand(0.01)
puts "(#{i})"
i * 2
end
parallelism = 10
all_inputs = Array.new(100) { |i| i }
class Worker < Concurrent::Actor::Context
def on_message(i)
work i
end
def default_executor
Concurrent.global_io_executor
end
end #
POOL = Concurrent::Actor::Utils::Pool.spawn('pool', parallelism) do |index|
Worker.spawn(name: "worker-#{index}")
end
# zip all futures into one
all_done = Concurrent.zip(*all_inputs.map do |i|
# ask the pool, returns future
POOL.ask i
end)
# block the main thread here until all is done
p all_done.value! # => [0, 2, 4, 6, 8, ... , 198]
@pitr-ch the problem here is that creating a closure (whether that's for a pool for a future) for every single work item just takes up too much memory.
I would assume that lumping together all the responses to the futures could also take up a large chunk of memory.
On 11 January 2016 at 11:08, Chris Seaton [email protected] wrote:
@pitr-ch https://github.com/pitr-ch the problem here is that creating a closure (whether that's for a pool for a future) for every single work item just takes up too much memory.
— Reply to this email directly or view it on GitHub https://github.com/ruby-concurrency/concurrent-ruby/issues/493#issuecomment-170510292 .
If it helps anyone, this is an approximation to the code I'm working on:
https://gist.github.com/digininja/d68f3c272778ec9a3299
I expect the word list to be around a million words and the individual tasks to be fairly quick and lightweight.
Ah, then Channels would work (cc @jdantonio) since they have back-pressure, a reading thread would become blocked if it would be sending jobs too fast to a job-handling-channel (which should have limited size). Actors would also work but there would have to be a bounded message box implemented. This use-case definitely points at some holes we have in our APIs. Thanks @digininja.
Excellent feedback and suggestions. Thank you everyone!
very good idea IMO, would be pleased to have a peach
like method with ruby concurrent.
peach
gem is unmaintained btw. As I would like to stick with ruby-concurrent and not add another dependency, I wrote a simple peach
implementation based on fixed thread pool:
module Enumerable
def peach threads = nil
pool = Concurrent::FixedThreadPool.new threads || (ENV['THREADS'] || '1').to_i
each do |item|
pool.post do
begin
yield item
rescue => e
puts "EXCEPTION: #{e.inspect}"
puts e.backtrace
end
end
end
pool.shutdown
pool.wait_for_termination
end
end
Having ThreadPoolExecutor block/wait fallback policy would also help with the issue.