Polyester.jl icon indicating copy to clipboard operation
Polyester.jl copied to clipboard

Usage: Strange timing behaviour when setting up threads once and sending functions to be run

Open mmiller-max opened this issue 4 years ago • 8 comments

I'm using CheapThreads to setup all threads except thread 1 with a function that listens to a Channel, and then runs another function according to what is in the channel. I'm not seeing the speedup over just running it on one thread that I expect.

Here is a MWE of the code I'm using (sorry it's not the prettiest):

using CheapThreads
using ThreadingUtilities
using VectorizationBase
using StrideArraysCore: object_and_preserve
using BenchmarkTools

struct BatchClosure{F, A, B}
  f::F
end
function (b::BatchClosure{F,A,B})(p::Ptr{UInt}) where {F,A,B}
  (offset, args) = ThreadingUtilities.load(p, A, 2*sizeof(UInt))
  b.f(args)
  B && free_local_threads!()
  nothing
end

@inline function batch_closure(f::F, args::A, ::Val{B}) where {F,A,B}
  bc = BatchClosure{F,A,B}(f)
  @cfunction($bc, Cvoid, (Ptr{UInt},))
end

@inline function setup_batch!(p::Ptr{UInt}, fptr::Ptr{Cvoid}, argtup)
  offset = ThreadingUtilities.store!(p, fptr, sizeof(UInt))
  offset = ThreadingUtilities.store!(p, argtup, offset)
  nothing
end

@generated function _launch_working_threads(
  f!::F, threadmask, nthread, args::Vararg{Any,K}
) where {F,K}
  q = quote
      threads = CheapThreads.UnsignedIteratorEarlyStop(threadmask, nthread)
  end
  block = quote
      i = 0x00000000
      tid = 0x00000000
      tm = CheapThreads.mask(threads)
      while true
          VectorizationBase.assume(tm ≠ zero(tm))
          tz = trailing_zeros(tm) % UInt32
          i += 0x00000001
          tz += 0x00000001
          tid += tz
          tm >>>= tz
          launch_batched_thread!(cfunc, tid, argtup)
          i == nthread && break
      end
      nothing
  end
  gcpr = Expr(:gc_preserve, block, :cfunc)
  argt = Expr(:tuple)
  for k ∈ 1:K
      CheapThreads.add_var!(q, argt, gcpr, args[k], :args, :gcp, k)
  end
  push!(q.args, :(argtup = $argt), :(cfunc = batch_closure(f!, argtup, Val{false}())), gcpr)
  push!(q.args, nothing)
  q
end

@inline function launch_batched_thread!(cfunc, tid, argtup)
  fptr = Base.unsafe_convert(Ptr{Cvoid}, cfunc)
  ThreadingUtilities.launch(tid, fptr, argtup) do p, fptr, argtup
      setup_batch!(p, fptr, argtup)
  end
end

function launch_threads(p, inchannels, outchannels)
  nthreads_to_launch = p.nthreads - 1
  torelease_vec = zeros(UInt8, nthreads_to_launch)
  threadmask_vec = zeros(UInt8, nthreads_to_launch)
  for i in 1:nthreads_to_launch
      nthread=1
      threads, torelease = CheapThreads.request_threads(Base.Threads.threadid(), nthread)
      threadmask = CheapThreads.mask(threads)
      _launch_working_threads(thread_workers,
                              threadmask,
                              nthread,
                              p,
                              inchannels,
                              outchannels)
      torelease_vec[i] = torelease
      threadmask_vec[i] = threadmask
  end
  return torelease_vec, threadmask_vec
end

@generated function _stop_working_threads(threadmask, nthread, torelease)
  q = quote
      threads = CheapThreads.UnsignedIteratorEarlyStop(threadmask, nthread)
  end
  block = quote
      tm = CheapThreads.mask(threads)
      tid = 0x00000000
      while true
          VectorizationBase.assume(tm ≠ zero(tm))
          tz = trailing_zeros(tm) % UInt32
          tz += 0x00000001
          tm >>>= tz
          tid += tz
          ThreadingUtilities.wait(tid)
          iszero(tm) && break
      end
      CheapThreads.free_threads!(torelease)
      nothing
  end
  push!(q.args, block)
  push!(q.args, nothing)
  q
end

function stop_workers(inchannels, outchannels, torelease_vec, threadmask_vec)
  for channel in inchannels
    put!(channel, :stop_workers)
  end
  for channel in outchannels
    take!(channel)
  end
  for i in eachindex(torelease_vec)
    _stop_working_threads(threadmask_vec[i], 1, torelease_vec[i])
  end
end

function f1!(p, start, stop)
  for i in start:stop
    p.vec1[i] = p.vec2[i] * occursin("1", string(i))
  end
end

thread_workers(args::Tuple) = thread_workers(args...)
function thread_workers(p, inchannels, outchannels)
    # Set up thread specific variables
    tid = Threads.threadid()
    start = p.start[tid]
    stop = p.stop[tid]
    inchannel = inchannels[tid-1] # calling thread doesn't have a channel
    outchannel = outchannels[tid-1] # calling thread doesn't have a channel
    
    # Work
    spin_down = false
    while !spin_down
        fname = take!(inchannel)
        if fname == :f1!
          f1!(p, start, stop)
        elseif fname == :stop_workers
            spin_down = true
        end
        put!(outchannel, spin_down)
    end
end

function send_to_channels(fname, inchannels, n_channnels)
    for channel in @view(inchannels[1:n_channnels])
      put!(channel, fname)
    end
end

function wait_for_channels(outchannels, n_channnels)
  for channel in @view(outchannels[1:n_channnels])
    take!(channel)
  end
end

function do_local(fname, p) 
  if fname == :f1!
    f1!(p, p.start[1], p.start[2])
  end
end

# nchannels is for debugging, allows testing on subset of threads
function send_to_workers(fname::Symbol, p, inchannels, outchannels, n_channnels=num_threads()-1)
  send_to_channels(fname, inchannels, n_channnels)
  do_local(fname, p)
  wait_for_channels(outchannels, n_channnels)
end

struct P1
  vec1::Vector{Int64}
  vec2::Vector{Int64}
  start::Vector{Int64}
  stop::Vector{Int64}
  nthreads::Int64
end

function get_start_and_stop_indices(total_length, nbatches)
	fld_val = fld(total_length, nbatches)
	start_indices = collect(1:fld_val:fld_val*nbatches)
	stop_indices = vcat(collect(fld_val:fld_val:total_length-fld_val), total_length)
	return start_indices, stop_indices
end

len = 8000
p = P1(zeros(Int64, len), ones(Int64, len), get_start_and_stop_indices(len,  num_threads())..., num_threads())
inchannels = [Channel{Symbol}() for i in 1:num_threads()-1]
outchannels = [Channel{Bool}() for i in 1:num_threads()-1]

# Launch threads
torelease_vec, threadmask_vec = launch_threads(p, inchannels, outchannels)

# Run code
send_to_workers(:f1!, p, inchannels, outchannels)

And here are the strange benchmarks with 8 threads (note setting the last argument of send_to_workers affects the number of threads used but not how the range is split):

julia> @btime f1!(p, 1, len)
  452.311 μs (16000 allocations: 750.00 KiB)
julia> @btime send_to_workers(:f1!, p, inchannels, outchannels,0) # no workers, only running 1/8th on main thread
  55.999 μs (2002 allocations: 93.84 KiB)
julia> @btime send_to_workers(:f1!, p, inchannels, outchannels,1) # 1 worker running
  58.974 μs (4003 allocations: 187.63 KiB) # approx 1/8 of full running, expected
julia> @btime send_to_workers(:f1!, p, inchannels, outchannels,2)
  63.432 μs (6005 allocations: 281.44 KiB)
julia> @btime send_to_workers(:f1!, p, inchannels, outchannels,3)
  68.613 μs (8008 allocations: 375.28 KiB)
julia> @btime send_to_workers(:f1!, p, inchannels, outchannels,4)
  116.588 μs (10011 allocations: 469.13 KiB)
julia> @btime send_to_workers(:f1!, p, inchannels, outchannels,7) # All workers running
  150.549 μs (16019 allocations: 750.63 KiB)
# Stop threads - run this after any benchmarks to stop the functions on the threads
stop_workers(inchannels, outchannels, torelease_vec, threadmask_vec)
I expect the allocations to increase, but expect each thread to finish in approx 60 μs regardless of how many threads are being used. Would be grateful for some help with what is going on here!

mmiller-max avatar Mar 25 '21 09:03 mmiller-max

Are you by any chance on a 4 core/8 thread laptop? The jump is quite large here:

julia> @btime send_to_workers(:f1!, p, inchannels, outchannels,3)
  68.613 μs (8008 allocations: 375.28 KiB)
julia> @btime send_to_workers(:f1!, p, inchannels, outchannels,4)
  116.588 μs (10011 allocations: 469.13 KiB)

from 4 to 5 total threads busy, which would be consistent with there being only 4 physical cores.

Laptops are also likely to boost a lot less as you increase the number of threads used, which would also explain performance degradation. I'd also suggest showing @benchmark, I'd be interested in seeing the variability in timings. For threaded code, the median is better than the minimum IMO.

I was going to try this myself, but @btime send_to_workers(:f1!, p, inchannels, outchannels,1) seemed to deadlock.

You may also want to consider lower level primitives, e.g. atomics + pause. But this depends heavily on the application; I'd only do it if I expect there to at most be a handful of pauses (e.g. threads doing the same amount of work needing to periodically communicate). I'm nervous about calling yield or wait, like the channels will do. I haven't really looked into that, but suspect it risks deadlocks.

EDIT:

julia> send_to_workers(:f1!, p, inchannels, outchannels, 1)
^CERROR: InterruptException:
Stacktrace:
 [1] poptask(W::Base.InvasiveLinkedListSynchronized{Task})
   @ Base ./task.jl:755
 [2] wait
   @ ./task.jl:763 [inlined]
 [3] wait(c::Base.GenericCondition{ReentrantLock})
   @ Base ./condition.jl:113
 [4] put_unbuffered(c::Channel{Symbol}, v::Symbol)
   @ Base ./channels.jl:341
 [5] put!
   @ ./channels.jl:316 [inlined]
 [6] send_to_channels(fname::Symbol, inchannels::Vector{Channel{Symbol}}, n_channnels::Int64)
   @ Main ./REPL[18]:3
 [7] send_to_workers(fname::Symbol, p::P1, inchannels::Vector{Channel{Symbol}}, outchannels::Vector{Channel{Bool}}, n_channnels::Int64)
   @ Main ./REPL[21]:3
 [8] top-level scope
   @ REPL[48]:1

It hangs on the wait on the channel.

chriselrod avatar Mar 25 '21 09:03 chriselrod

No reason for this to be @generated:

function _stop_working_threads(threadmask, nthread, torelease)
    threads = CheapThreads.UnsignedIteratorEarlyStop(threadmask, nthread)
    tm = CheapThreads.mask(threads)
    tid = 0x00000000
    while true
        VectorizationBase.assume(tm ≠ zero(tm))
        tz = trailing_zeros(tm) % UInt32
        tz += 0x00000001
        tm >>>= tz
        tid += tz
        ThreadingUtilities.wait(tid)
        iszero(tm) && break
    end
    CheapThreads.free_threads!(torelease)
    nothing
end

You should also be able to do for tid in threads, but the asm looked a bit worse.

chriselrod avatar Mar 25 '21 09:03 chriselrod

Are you by any chance on a 4 core/8 thread laptop?

Yes I am! That does make sense. I haven't seen this issue with Threads.@threads, is the behaviour different here because it's more low level?

You may also want to consider lower level primitives, e.g. atomics + pause. But this depends heavily on the application

I'm trying to set up the threads ahead of an ODE solve with DifferentialEquations. So each call to the model will send 5-10 functions to the threads, and there will be hundreds of calls to the model. But all threads should do the same amount of work. Do you think using atomics/pause be better for this case?

It hangs on the wait on the channel.

I've tried to ensure the channels don't lock up with this - it does hangs there if for some reason the workers haven't started or have crashed. I've just tested again and it seems to work for me, if you've got time please could you retry?

I'd also suggest showing @benchmark

The median is quite different, and still has a significant jump from 4 to 5 threads.

julia> @benchmark send_to_workers(:f1!, p, inchannels, outchannels,0)
BenchmarkTools.Trial: 
  memory estimate:  93.84 KiB
  allocs estimate:  2002
  --------------
  minimum time:     55.742 μs (0.00% GC)
  median time:      59.722 μs (0.00% GC)
  mean time:        68.600 μs (9.49% GC)
  maximum time:     5.012 ms (97.52% GC)
  --------------
  samples:          10000
  evals/sample:     1

julia> @benchmark send_to_workers(:f1!, p, inchannels, outchannels,1)
BenchmarkTools.Trial: 
  memory estimate:  187.63 KiB
  allocs estimate:  4003
  --------------
  minimum time:     60.061 μs (0.00% GC)
  median time:      67.749 μs (0.00% GC)
  mean time:        98.385 μs (16.90% GC)
  maximum time:     12.060 ms (97.80% GC)
  --------------
  samples:          10000
  evals/sample:     1

julia> @benchmark send_to_workers(:f1!, p, inchannels, outchannels,2)
BenchmarkTools.Trial: 
  memory estimate:  281.44 KiB
  allocs estimate:  6005
  --------------
  minimum time:     63.220 μs (0.00% GC)
  median time:      71.126 μs (0.00% GC)
  mean time:        106.142 μs (27.15% GC)
  maximum time:     15.476 ms (98.08% GC)
  --------------
  samples:          10000
  evals/sample:     1

julia> @benchmark send_to_workers(:f1!, p, inchannels, outchannels,3)
BenchmarkTools.Trial: 
  memory estimate:  375.28 KiB
  allocs estimate:  8008
  --------------
  minimum time:     68.208 μs (0.00% GC)
  median time:      79.677 μs (0.00% GC)
  mean time:        132.897 μs (33.11% GC)
  maximum time:     24.718 ms (98.68% GC)
  --------------
  samples:          10000
  evals/sample:     1

julia> @benchmark send_to_workers(:f1!, p, inchannels, outchannels,4)
BenchmarkTools.Trial: 
  memory estimate:  469.09 KiB
  allocs estimate:  10010
  --------------
  minimum time:     118.910 μs (0.00% GC)
  median time:      151.100 μs (0.00% GC)
  mean time:        248.998 μs (26.50% GC)
  maximum time:     35.166 ms (99.01% GC)
  --------------
  samples:          10000
  evals/sample:     1

julia> @benchmark send_to_workers(:f1!, p, inchannels, outchannels,7)
BenchmarkTools.Trial: 
  memory estimate:  750.63 KiB
  allocs estimate:  16019
  --------------
  minimum time:     148.322 μs (0.00% GC)
  median time:      163.124 μs (0.00% GC)
  mean time:        338.311 μs (31.36% GC)
  maximum time:     61.900 ms (97.73% GC)
  --------------
  samples:          10000
  evals/sample:     1

No reason for this to be @generated:

Ah I don't quite understand when things need to use @generated - why does _launch_working_threads need it?

mmiller-max avatar Mar 25 '21 10:03 mmiller-max

Ah I don't quite understand when things need to use @generated - why does _launch_working_threads need it?

They need to be @generated when you want to change the function's body based on type information. Multiple dispatch can effectively do that too (i.e., different functions getting called has the same effect), so @generated is rarely needed.

But in _launch_working_threads:

  for k ∈ 1:K
      CheapThreads.add_var!(q, argt, gcpr, args[k], :args, :gcp, k)
  end

@generated is used to GC.@preserve arg1 arg2 arg3... code_during_which_args_are_preserved. I.e., it unpacks tuples and splats them into the macro. AFAIK you can't do that without expression manipulation.

Yes I am! That does make sense. I haven't seen this issue with Threads.@threads, is the behaviour different here because it's more low level?

Although you must've noticed a big jump from 4->5 threads? I don't know how intense computatinoally occursin is, but anything able to make good use of your CPU's execution resources will probably peak performance wise with 1 thread/physical core. E.g., BLAS is best with 4 threads on a 4 core/8 thread computer. Increasing it to 8 threads will slow down each thread by more than 2x, hurting performance.

If you want to eliminate down clocking, you could temporarily disable turbo (note this will likely make your laptop much slower, until you re-enable it). E.g., [here[(https://julialinearalgebra.github.io/BLASBenchmarksCPU.jl/dev/turbo/).

I'm trying to set up the threads ahead of an ODE solve with DifferentialEquations. So each call to the model will send 5-10 functions to the threads, and there will be hundreds of calls to the model. But all threads should do the same amount of work. Do you think using atomics/pause be better for this case?

Is that necessarily the case? Don't step sizes/number of iterations often vary, for example? Probably better to stick with channels, for now.

I've tried to ensure the channels don't lock up with this - it does hangs there if for some reason the workers haven't started or have crashed. I've just tested again and it seems to work for me, if you've got time please could you retry?

Yes. The problem is that you're creating channels with 0 capacity. The lockup has nothing to do with threading.

julia> inchannels
7-element Vector{Channel{Symbol}}:
 Channel{Symbol}(0) (empty)
 Channel{Symbol}(0) (empty)
 Channel{Symbol}(0) (empty)
 Channel{Symbol}(0) (empty)
 Channel{Symbol}(0) (empty)
 Channel{Symbol}(0) (empty)
 Channel{Symbol}(0) (empty)

julia> put!(inchannels[1], :f1!) # hangs until Ctrl-C
^CERROR: InterruptException:
Stacktrace:
 [1] poptask(W::Base.InvasiveLinkedListSynchronized{Task})
   @ Base ./task.jl:755
 [2] wait
   @ ./task.jl:763 [inlined]
 [3] wait(c::Base.GenericCondition{ReentrantLock})
   @ Base ./condition.jl:113
 [4] put_unbuffered(c::Channel{Symbol}, v::Symbol)
   @ Base ./channels.jl:341
 [5] put!(c::Channel{Symbol}, v::Symbol)
   @ Base ./channels.jl:316
 [6] top-level scope
   @ REPL[33]:1

EDIT: Nevermind, still locks up when setting to 1. This code has never worked for me. Just to confirm, you're able to copy and paste the code from your issue and run it? If so, I'd have to take a closer look at a later time to try and debug.

The median is quite different

Thanks. I'm also wondering if an averaging out of sorts is going on. The time for a collection of N threads is the maximum time of all of them. So if there's a lot of variability, an increase in time would also be expected for that reason. But I don't think there's enough variability from a single thread running in isolation (i.e.., send_to_workers(:f1!, p, inchannels, outchannels,0)) to account for your times.

chriselrod avatar Mar 25 '21 11:03 chriselrod

Thanks for the @generated explanation, make sense.

Although you must've noticed a big jump from 4->5 threads?

Oh yeah just ran a test, just using a version of f1! above with Threads.@threads and I see a very similar speed pattern to the example above.

Is that necessarily the case? Don't step sizes/number of iterations often vary, for example?

Sorry I wasn't clear. Rather than a different solution on each thread, there is only one call to solve, and for each step of the solver it runs a model which send functions to threads which should be approximately equal in computation time.

This code has never worked for me. Just to confirm, you're able to copy and paste the code from your issue and run it?

Yes a copy and paste works, except stop_workers should not be run before the benchmarks, otherwise it does hang (as the functions on the threads have stopped, nothing take!ing from the channels). I'll edit the MWE to make that clear (and also change the name of the variable length as that's bad).

mmiller-max avatar Mar 25 '21 21:03 mmiller-max

Yes a copy and paste works, except stop_workers should not be run before the benchmarks

Ha ha. Just to confirm, I was definitely running stop_workers before running the benchmarks.

As an aside:

function launch_threads(p, inchannels, outchannels)
  nthreads_to_launch = p.nthreads - 1
  torelease_vec = zeros(UInt8, nthreads_to_launch)
  threadmask_vec = zeros(UInt8, nthreads_to_launch)

Instead of UInt8, this should be

julia> CheapThreads.worker_type()
UInt32

which varies based on the number of threads (i.e., it's roughly an unsigned integer with nextpow2(min(Sys.CPU_THREADS, Threads.nthreads())) bits).

Some benchmarks on a machine with >8 physical cores:

julia> @btime f1!(p, 1, len)
  297.019 μs (16000 allocations: 750.00 KiB)

julia> @btime send_to_workers(:f1!, p, inchannels, outchannels,0) # no workers, only running 1/8th on main thread
  36.724 μs (2002 allocations: 93.84 KiB)

julia> @btime send_to_workers(:f1!, p, inchannels, outchannels,1) # 1 worker running
  42.116 μs (4003 allocations: 187.62 KiB)

julia> @btime send_to_workers(:f1!, p, inchannels, outchannels,2)
  45.120 μs (6005 allocations: 281.44 KiB)

julia> @btime send_to_workers(:f1!, p, inchannels, outchannels,3)
  48.047 μs (8008 allocations: 375.28 KiB)

julia> @btime send_to_workers(:f1!, p, inchannels, outchannels,4)
  52.100 μs (10010 allocations: 469.09 KiB)

julia> @btime send_to_workers(:f1!, p, inchannels, outchannels,7) # All workers running
  72.026 μs (16018 allocations: 750.59 KiB)

julia> @benchmark send_to_workers(:f1!, p, inchannels, outchannels,0)
BenchmarkTools.Trial:
  memory estimate:  93.84 KiB
  allocs estimate:  2002
  --------------
  minimum time:     37.155 μs (0.00% GC)
  median time:      39.123 μs (0.00% GC)
  mean time:        43.219 μs (7.46% GC)
  maximum time:     2.110 ms (96.01% GC)
  --------------
  samples:          10000
  evals/sample:     1

julia> @benchmark send_to_workers(:f1!, p, inchannels, outchannels,1)
BenchmarkTools.Trial:
  memory estimate:  187.66 KiB
  allocs estimate:  4004
  --------------
  minimum time:     43.003 μs (0.00% GC)
  median time:      49.592 μs (0.00% GC)
  mean time:        68.675 μs (17.56% GC)
  maximum time:     6.396 ms (98.35% GC)
  --------------
  samples:          10000
  evals/sample:     1

julia> @benchmark send_to_workers(:f1!, p, inchannels, outchannels,2)
BenchmarkTools.Trial:
  memory estimate:  281.44 KiB
  allocs estimate:  6005
  --------------
  minimum time:     45.724 μs (0.00% GC)
  median time:      76.082 μs (0.00% GC)
  mean time:        95.620 μs (28.44% GC)
  maximum time:     13.857 ms (99.09% GC)
  --------------
  samples:          10000
  evals/sample:     1

julia> @benchmark send_to_workers(:f1!, p, inchannels, outchannels,3)
BenchmarkTools.Trial:
  memory estimate:  375.28 KiB
  allocs estimate:  8008
  --------------
  minimum time:     47.570 μs (0.00% GC)
  median time:      83.480 μs (0.00% GC)
  mean time:        121.751 μs (36.68% GC)
  maximum time:     22.570 ms (99.37% GC)
  --------------
  samples:          10000
  evals/sample:     1

julia> @benchmark send_to_workers(:f1!, p, inchannels, outchannels,4)
BenchmarkTools.Trial:
  memory estimate:  469.09 KiB
  allocs estimate:  10010
  --------------
  minimum time:     53.423 μs (0.00% GC)
  median time:      89.912 μs (0.00% GC)
  mean time:        146.520 μs (41.81% GC)
  maximum time:     31.063 ms (99.53% GC)
  --------------
  samples:          10000
  evals/sample:     1

julia> @benchmark send_to_workers(:f1!, p, inchannels, outchannels,7)
BenchmarkTools.Trial:
  memory estimate:  750.59 KiB
  allocs estimate:  16018
  --------------
  minimum time:     74.943 μs (0.00% GC)
  median time:      124.173 μs (0.00% GC)
  mean time:        254.166 μs (51.93% GC)
  maximum time:     63.720 ms (99.67% GC)
  --------------
  samples:          10000
  evals/sample:     1

Here, no slowdown from 4->5 threads used (i.e., 3->4 spawned tasks) is expected. A little surprising that there is one, but the change in median time isn't greater than the earlier increases, and much smaller than you get on a 4-core machine.

Overall though, this is drastically more slow down than I'd like to see. To look at it more, you could collect the actual times of each sample from the BenchmarkResult object of send_to_workers(:f1!, p, inchannels, outchannels,0), and then resample pairs of these times randomly, taking the max. Then, compare the distribution of these maxes with the distribution of times in send_to_workers(:f1!, p, inchannels, outchannels,1).

E.g.

julia> br0 = @benchmark send_to_workers(:f1!, $p, $inchannels, $outchannels, 0);

julia> br0maxes2 = max.(rand(br0.times, 10_000), rand(br0.times, 10_000));

julia> br1 = @benchmark send_to_workers(:f1!, $p, $inchannels, $outchannels, 1);

julia> br0maxes4 = max.(ntuple(_ -> rand(br0.times, 10_000), Val(4))...);

julia> br1maxes4 = max.(rand(br1.times, 10_000), rand(br1.times, 10_000));

julia> br3 = @benchmark send_to_workers(:f1!, $p, $inchannels, $outchannels, 3);

Now we can compare br0maxes with br1.times:

julia> summarystats(br0maxes2)
Summary Stats:
Length:         10000
Missing Count:  0
Mean:           47024.956200
Minimum:        37046.000000
1st Quartile:   38173.000000
Median:         38553.000000
3rd Quartile:   39745.000000
Maximum:        2218152.000000


julia> summarystats(br1.times)
Summary Stats:
Length:         10000
Missing Count:  0
Mean:           68496.718600
Minimum:        42140.000000
1st Quartile:   47651.750000
Median:         49171.000000
3rd Quartile:   70634.000000
Maximum:        6493284.000000

br1 indeed looks a lot slower than just evaluating br0 twice and waiting for the last of the two to finish.

Now lets simulate sets of 4 runs, and waiting for the slowest of the 4 to finish:

julia> summarystats(br0maxes4)
Summary Stats:
Length:         10000
Missing Count:  0
Mean:           57865.660600
Minimum:        37551.000000
1st Quartile:   38550.000000
Median:         39375.000000
3rd Quartile:   40963.000000
Maximum:        2218152.000000


julia> summarystats(br1maxes4)
Summary Stats:
Length:         10000
Missing Count:  0
Mean:           82753.390400
Minimum:        43412.000000
1st Quartile:   49214.750000
Median:         69490.000000
3rd Quartile:   72685.000000
Maximum:        6493284.000000


julia> summarystats(br3.times)
Summary Stats:
Length:         10000
Missing Count:  0
Mean:           119948.098800
Minimum:        47989.000000
1st Quartile:   59810.000000
Median:         83067.500000
3rd Quartile:   85706.250000
Maximum:        21982682.000000

br3.times (the actual sets of 4) was the slowest, and br1maxes4 (actual sets of 2, picking the slowest of them) was slower than br0maxes4 (actual sets of 1, picking the slowest of them). Thus, seems like there is a lot of overhead in threading here compared to the serial runs. =/

Sorry I wasn't clear. Rather than a different solution on each thread, there is only one call to solve, and for each step of the solver it runs a model which send functions to threads which should be approximately equal in computation time.

In that case, I think pause + atomics would be worth a try.

chriselrod avatar Mar 25 '21 22:03 chriselrod

Ha ha. Just to confirm, I was definitely running stop_workers before running the benchmarks.

Haha glad we found it!

Instead of UInt8, this should be

Ah makes sense, thanks.

Thus, seems like there is a lot of overhead in threading here compared to the serial runs. =/

That was a great analysis, good way of checking it. But yeah still seems to be a fair bit power than I'd like! I wonder how much of this is the Channels.

In that case, I think pause + atomics would be worth a try.

I can't find anything about pauses anywhere - do you know of an example I could look at for this?

mmiller-max avatar Mar 26 '21 04:03 mmiller-max

I can't find anything about pauses anywhere - do you know of an example I could look at for this?

This is probably not the best way to do this, but:

_mv = _atomic_add!(myp, one(UInt))
sync_iters += one(UInt)
let atomp = atomicp
    for _ ∈ CloseOpen(total_ids)
        atomp += cache_linesize()
        atomp == myp && continue
        while _atomic_load(atomp) != sync_iters
            pause()
        end
    end
end

Every thread has a counter for which iteration they're on, stored in atomicp, offset by cache_linesize() so that they're in separate cachelines. It increments its own pointer with _atomic_add!(myp, one(UInt)). I realize now that the atomic add should be unnecessary, because each slot will only be mutated by the thread that owns it. This means the atomic_add is totally unnecessary. It should instead be

sync_iters += one(UInt)
_atomic_store!(myp, sync_iters) # is the atomic store even necessary? Or would `unsafe_store!` suffice?

I'll make and test that change later.

Then it checks on every thread, to see if they've hit the same sync_iters yet. Only once every other thread has incremented the value at their respective offset will the thread move on. Because all threads are doing this, it synchronizes them.

You can have threads store information somewhere beforehand. In the example I linked to, the threads want to share a big block of memory, so the synchronization means that each thread finished doing their respective chunk of the work, meaning it's safe to proceed. So you could signal that one thread finished storing data somewhere in this manner, and that it's thus safe for (an arbitrary number of) others to proceed and use this data. E.g., on my 18 core CPU this let each of the 18 cores communicate to the other 17 that the data they copied is ready for them to use, and keep them in sync (this data changes on every sync_iters update).

pause is just a CPU instruction that hints to the CPU that this is what you're doing (avoiding memory order violations), and makes it more energy efficient.

chriselrod avatar Mar 26 '21 04:03 chriselrod