ProgressMeter.jl
ProgressMeter.jl copied to clipboard
feature request: allow updating of progress bar from functions launched on separate workers.
Consider this following MWE which works but not as intended.
using Distributed
addprocs(2)
@everywhere using ProgressMeter
p = Progress(1000, barglyphs=BarGlyphs("[=> ]"))
@everywhere function simulation(s)
for i = 1:100
sleep(0.01)
end
end
progress_pmap(x -> simulation(x), 1:10, progress=p, channel_bufflen=1000)
The intention: Suppose the simulation function is a long-running function that can provide it's progress by utilizing the internal for loop. Further suppose that simulation is independently launched on multiple processes by pmap. The goal is to have a more granular progress bar that gets updated at each iteration in the for loop for each simulation. Thus, by a simple calculation the maximum length of a progress bar is of length 1000 where each unit of the progress bar is updated by the for loop in simulation. In other words, 10 simulation processes all updating the progress bar through their internal for loops one unit at a time. This allows monitor progress of long running computations.
PmapProgressMeters.jl did this by passing simulation a callback function that updated the Progress bar on the main process.
However, I am trying to think of a solution involving Channels and RemoteChannel although I don't have the expertise of how this stuff works yet.
I tried
@everywhere function main(s)
for i = 1:100
remotecall(put!(channel, true), 1)
sleep(0.01)
end
end
but I get
On worker 3:
UndefVarError: channel not defined
which dosn't make sense to me. According to the source code,
vals = mapfun(other_args...; kwargs...) do x...
val = f(x...)
put!(channel, true)
return val
end
the line put!(channel, true) is indeed part of a function that is launched on separate workers. Am I seeing a scoping issue here?
The reason for the error is that channel is not a defined symbol on the other worker processes. @everywhere simply executes the code on all the processes, and channel does not exist on those processes. Also note that
@everywhere channel = RemoteChannel(()->Channel{Bool}(10), 1)
will not work because it will create a new channel for each process.
I haven't been able to find documentation on this, but apparently anonymous functions like those created with the do or -> syntax behave differently wrt serialization and availability on multiple processes.
addprocs(2)
f(x) = x^2
pmap(f, 1:10)
will throw an error, but
addprocs(2)
f = x -> x^2
pmap(f, 1:10)
will work fine.
Anyways, workarounds are to (1) pass the channel as an argument to the function, or (2) use
main = s-> begin
for i = 1:100
remotecall(put!(channel, true), 1)
sleep(0.01)
end
end
Sorry I don't know more about why anonymous functions work differently.
https://discourse.julialang.org/t/documentation-about-using-anonymous-and-named-functions-with-pmap/19337
This might be a bug.. let's see if someone replies on discourse. You should clarify your post that both functions work fine when @everywhere()ed but the anon will work on all processes even if its not @everywhereed.
I'll see if I can figure this out over the next few days. This feature would be great for my workflow.
I have the same feature request. The code in the readme is basically perfect, it would just be nice if all that boilerplate could be hidden inside a parallel progress bar type like,
p = DistributedProgress(100)
pmap(1:10) do i
for j=1:10
...
next!(p)
end
end
finish!(p)
and the whole RemoteChannel thing would be set up for you (additionally, if it worked with showvalues).
Actually, I hacked together the following, which allows next! and update! from any worker. Is this approach something that you'd be interested in having as a PR?
struct DistributedProgress <: ProgressMeter.AbstractProgress
channel :: RemoteChannel{Channel{Any}}
pbar :: Progress
end
function DistributedProgress(args...; kwargs...)
pbar = Progress(args...; kwargs...)
channel = RemoteChannel(()->Channel(), 1)
@async begin
while (x = take!(channel)) != nothing
func, args, kwargs = x
func(pbar, args...; kwargs...)
end
finish!(pbar)
end
DistributedProgress(channel, pbar)
end
ProgressMeter.next!(pbar::DistributedProgress, args...; kwargs...) = put!(pbar.channel, (ProgressMeter.next!, args, kwargs))
ProgressMeter.update!(pbar::DistributedProgress, args...; kwargs...) = put!(pbar.channel, (ProgressMeter.update!, args, kwargs))
ProgressMeter.finish!(pbar::DistributedProgress, args...; kwargs...) = (put!(pbar.channel, nothing); close(pbar.channel))
Absolutely. Please PR and add tests if you can