ProgressLogging.jl icon indicating copy to clipboard operation
ProgressLogging.jl copied to clipboard

map, mapreduce, and reduce support

Open mileslucas opened this issue 4 years ago • 3 comments

It would be nice if map, mapreduce, and reduce were supported with @progress, for example

squares = @progress map(i -> i^2, 1:10)
prod = @progress map((x,y) -> x * y, 1:10, -5:5)
sum_squares = @progress mapreduce(+, 1:10) do x
	x^2
end
sum_ = @progress reduce(+, 1:10)

I've looked through the source code and at first glance it seems like the _progress function will not support this without a small rewrite.

I would propose refactoring _progress such that it first tees the input expression to the appropriate use-case (for loop vs. comprehension vs. mapping function), after which we can have _forprogress, _mapprogress, etc. This is similar to the way ProgressMeter parses their @showprogress macro.

I took a shot at implementing this myself and came up with the following

function _mapprogress(name, thresh, ex)
    L = length(eval(ex.args[3])) # get first collection sent to map
    body = ex.args[2]
    @gensym lastfrac frac val it count_var
    m = @__MODULE__
    quote
        $m.@withprogress name = $name begin
            $lastfrac = 0.0
            $count_var = Thrads.Atomic{Int}(0)
            $(ex.args[1])($(ex.args[3:end]...)) do $it
                $val = $body($it...)
                Threads.atomic_add!($count_var, 1)
                $frac = $count_var[] / $L
                if $frac - $lastfrac > $thresh
                    $m.@logprogress $frac
                    $lastfrac = $frac
                end
                $val
            end
        end
    end
end

using the above I was able to get progress to work for the case of map(x -> x^2, 1:10). It did not work with multiple ranges (eg map((x,y) -> ..., 1:10, -5:5)) and I didn't try writing up the reduce versions. The choice of using the atomic iterator was to make sure the call is thread-safe.

mileslucas avatar Jan 19 '21 21:01 mileslucas

I'm wondering if one wants to support map, reduce etc. would it be possible to also support ThreadsX by @tkf ?

Roger-luo avatar Aug 21 '21 06:08 Roger-luo

There's already Transducers.withprogress which uses ProgressLogging. So, something like this already "works":

using ThreadsX
using Transducers: withprogress
ThreadsX.sum(x -> gcd(x, 42), withprogress(1:2^30); basesize = 2^20)

It's not very accurate though.

tkf avatar Aug 22 '21 00:08 tkf

In the meantime, you could do this:

function progressmap(f, itr)
	l = length(itr)
	id = gensym()
	map(enumerate(itr)) do (i,x)
		Logging.@logmsg(ProgressLogging.ProgressLevel, "", progress=i/l, id=id)
		f(x)
	end
end

progressmap(sqrt, 1:10)

But I'm sure that we can do something more optimized :)

fonsp avatar May 19 '22 15:05 fonsp