concurrent-ruby icon indicating copy to clipboard operation
concurrent-ruby copied to clipboard

No good tools for long streams of tasks

Open chrisseaton opened this issue 9 years ago • 15 comments

A couple of times now I've been speaking to someone and they have a problem like this.

pool = Concurrent::FixedThreadPool.new(Concurrent.processor_count)

1.upto 2000000 do |i|
  pool.post do
    # work here
  end
end

or

File.open('foo').each do |line|
  pool.post do
    # work here
  end
end

In both cases we're creating a huge number of tasks, and in the latter case we may not know how many tasks in advance.

The problem in both cases is that people create millions of tasks which can take up a lot of memory with the proc and the closure.

I feel like we're missing two abstractions here.

The first is a basic parallel #each on a Enumerable with a length. We don't have that do we? It would need chunking (run n tasks in each task), and perhaps automatic chunking based on profiling (run 1 task, see how long it takes, think about how many tasks there are and set n based on that).

The second is something similar that works on an Enumerator, which doesn't have a length. Here chunking is harder as we don't know how many tasks there will be in advance. We may need some kind of work stealing here.

I'd like to write the two examples above as:

1.upto(2000000).parallel_each(pool) do
  # work here
end

or

File.open('foo').each.parallel_each(pool) do |line|
  # work here
end

In both cases we'd only create as many tasks at a time as was reasonable (if the pool has n threads it may be kn tasks for some small constant k).

A workaround in the mean time to just stop so many tasks being created and memory being blown may be to do this (ping @digininja this is relevant to you):

pool = Concurrent::FixedThreadPool.new(
  Concurrent.processor_count,
  max_queue: 10 * Concurrent.processor_count,
  fallback_policy: :caller_runs)

1.upto 2000000 do |i|
  pool.post do
    # work here
  end
end

This will only create up to 10 times as many tasks as you have cores, with any other tasks being run immediately instead of being added to the pool. This means if you already have say 40 tasks in the pool, instead of creating a new task it will be run, and then by the time the loop gets around to finishing that the pool may be ready for new tasks, or may not be in which case the main thread runs that new task as well.

chrisseaton avatar Jan 09 '16 12:01 chrisseaton

Oh when you are doing this you may want to set the thread pool to size Concurrent.processor_count - 1, as the main thread will also be running tasks, so you would have Concurrent.processor_count + 1 tasks running and being switched on and off cores, ruining cache.

chrisseaton avatar Jan 09 '16 12:01 chrisseaton

Sounds like a good option, I'll give it a try tonight. On 9 Jan 2016 12:50, "Chris Seaton" [email protected] wrote:

Oh when you are doing this you may want to set the thread pool to size Concurrent.processor_count

  • 1, as the main thread will also be running tasks, so you would have Concurrent.processor_count
  • 1 tasks running and being switched on and off cores, ruining cache.

— Reply to this email directly or view it on GitHub https://github.com/ruby-concurrency/concurrent-ruby/issues/493#issuecomment-170237496 .

digininja avatar Jan 09 '16 13:01 digininja

This exists in many gems as peach/pmap, but I think it would make total sense in c-r. It sounds like Scala's .par to me, that might be an interesting direction to explore. It could indeed be nice to have parallel enumerators in additions to eager and lazy ones :)

eregon avatar Jan 09 '16 15:01 eregon

Have you got good links to describe the basics of how all this works, I.e. what are the different types of enumerators? On 9 Jan 2016 15:15, "Benoit Daloze" [email protected] wrote:

This exists in many gems as peach/pmap, but I think it would make total sense in c-r. It sounds like Scala's .par http://docs.scala-lang.org/overviews/parallel-collections/overview.html to me, that might be an interesting direction to explore. It could indeed be nice to have parallel enumerators in additions to eager and lazy ones :)

— Reply to this email directly or view it on GitHub https://github.com/ruby-concurrency/concurrent-ruby/issues/493#issuecomment-170250964 .

digininja avatar Jan 09 '16 15:01 digininja

:+1: to parallel enumerable.

I'm heading out the door and only have time for a quick note. This topic has been discussed at length before and some spike code was written. Please see #222, #229, and #231.

// @SebastianEdwards

jdantonio avatar Jan 09 '16 15:01 jdantonio

Ah I didn't know about those. We need a parallel Enumerator as well as Enumerable though, for streams of unknown length. I didn't see that discussed yet.

chrisseaton avatar Jan 09 '16 15:01 chrisseaton

For my real code, and my sample that @chrisseaton reproduced above, the change to the pool initialisation worked perfectly. Memory doesn't go above 2.1% on my 16G machine so I've pushed the queue size up to 90 times and still only at 2.3% max.

digininja avatar Jan 09 '16 21:01 digininja

An example with actors, which also has configurable parallelism, does not require to manage pool directly.

require 'concurrent-edge'

def work(i)
  sleep rand(0.01)
  puts "(#{i})"
  i * 2
end

parallelism = 10
all_inputs  = Array.new(100) { |i| i }

class Worker < Concurrent::Actor::Context
  def on_message(i)
    work i
  end

  def default_executor
    Concurrent.global_io_executor
  end
end #

POOL = Concurrent::Actor::Utils::Pool.spawn('pool', parallelism) do |index|
  Worker.spawn(name: "worker-#{index}")
end

# zip all futures into one
all_done = Concurrent.zip(*all_inputs.map do |i|
                            # ask the pool, returns future
                            POOL.ask i
                          end)

# block the main thread here until all is done
p all_done.value! # => [0, 2, 4, 6, 8, ... , 198]

pitr-ch avatar Jan 11 '16 10:01 pitr-ch

@pitr-ch the problem here is that creating a closure (whether that's for a pool for a future) for every single work item just takes up too much memory.

chrisseaton avatar Jan 11 '16 11:01 chrisseaton

I would assume that lumping together all the responses to the futures could also take up a large chunk of memory.

On 11 January 2016 at 11:08, Chris Seaton [email protected] wrote:

@pitr-ch https://github.com/pitr-ch the problem here is that creating a closure (whether that's for a pool for a future) for every single work item just takes up too much memory.

— Reply to this email directly or view it on GitHub https://github.com/ruby-concurrency/concurrent-ruby/issues/493#issuecomment-170510292 .

digininja avatar Jan 11 '16 11:01 digininja

If it helps anyone, this is an approximation to the code I'm working on:

https://gist.github.com/digininja/d68f3c272778ec9a3299

I expect the word list to be around a million words and the individual tasks to be fairly quick and lightweight.

digininja avatar Jan 11 '16 11:01 digininja

Ah, then Channels would work (cc @jdantonio) since they have back-pressure, a reading thread would become blocked if it would be sending jobs too fast to a job-handling-channel (which should have limited size). Actors would also work but there would have to be a bounded message box implemented. This use-case definitely points at some holes we have in our APIs. Thanks @digininja.

pitr-ch avatar Jan 14 '16 16:01 pitr-ch

Excellent feedback and suggestions. Thank you everyone!

jdantonio avatar Jan 15 '16 06:01 jdantonio

very good idea IMO, would be pleased to have a peach like method with ruby concurrent.

peach gem is unmaintained btw. As I would like to stick with ruby-concurrent and not add another dependency, I wrote a simple peach implementation based on fixed thread pool:

module Enumerable

  def peach threads = nil
    pool = Concurrent::FixedThreadPool.new threads || (ENV['THREADS'] || '1').to_i

    each do |item|
      pool.post do
        begin
          yield item
        rescue => e
          puts "EXCEPTION: #{e.inspect}"
          puts e.backtrace
        end
      end
    end

    pool.shutdown
    pool.wait_for_termination
  end

end

brauliobo avatar Dec 22 '16 09:12 brauliobo

Having ThreadPoolExecutor block/wait fallback policy would also help with the issue.

pitr-ch avatar Jul 06 '18 16:07 pitr-ch