YAXArrays.jl icon indicating copy to clipboard operation
YAXArrays.jl copied to clipboard

`mapCube` in Distributed model is many times slower

Open kongdd opened this issue 2 years ago • 6 comments

Serial: 8s

using YAXArrays, Statistics, Zarr
using DimensionalData
using Dates
axlist = (
    Dim{:time}(Date("2022-01-01"):Day(1):Date("2022-12-31")),
    Dim{:lon}(range(1, 10, length=1000)),
    Dim{:lat}(range(1, 5, length=1500)),
    Dim{:Variable}(["var1", "var2"])
    )

data = rand(axlist...)
ds = YAXArray(axlist, data)
c = ds[Variable = At("var1")] # see OpenNetCDF to get the file
c2 = setchunks(c, (365, 100, 150))

function mymean(output, xin)
    # @show "doing a mean"
    output[:] .= mean(xin)
end

indims = InDims("time")
outdims = OutDims()
@time resultcube = mapCube(mymean, c2; indims, outdims)

image

Parallel: 660 seconds

using Distributed
addprocs(4)
# @everywhere using Pkg
# @everywhere Pkg.activate(".")
@everywhere begin
  # using NetCDF
  using YAXArrays
  using Statistics
  using Zarr
end

@everywhere function mymean(output, xin)
    # @show "doing a mean"
    output[:] .= mean(xin)
end

indims = InDims("time")
outdims = OutDims()

# turn out to be slower? why?
@time resultcube2 = mapCube(mymean, c2; indims, outdims)

image

kongdd avatar Oct 01 '23 15:10 kongdd

Welcome and thanks a lot for the bug report.

I am quite certain that your system is swapping to disk here in the parallel case because the computation is using too much memory. In general YAXArrays is optimised to work on data stored on disk where data is loaded for processing only when needed.

Your test input array is quite large and currently when using multiple workers the whole input array is copied to every worker which is why we run out of memory here. A better MWE to compare single process and multi-process performance would be this one where we save the generated data to disk before processing:

using YAXArrays, Statistics, Zarr, NetCDF
using DimensionalData
using Dates
axlist = (
    Dim{:time}(Date("2022-01-01"):Day(1):Date("2022-12-31")),
    Dim{:lon}(range(1, 10, length=1000)),
    Dim{:lat}(range(1, 5, length=1500)),
    Dim{:Variable}(["var1", "var2"])
    )

data = rand(axlist...)
ds = YAXArray(axlist, data)

#Save the cube to zarr for parallel processing
dschunked = setchunks(ds, (time=365, lon=100, lat=150))
p = tempname()
savecube(dschunked,p,backend=:zarr)



c = Cube(p)[Variable = At("var1")] # see OpenNetCDF to get the file

function mymean(output, xin)
    # @show "doing a mean"
    output[:] .= mean(xin)
end

indims = InDims("time")
outdims = OutDims()
@time resultcube = mapCube(mymean, c; indims, outdims)

using Distributed
addprocs(4)
# @everywhere using Pkg
# @everywhere Pkg.activate(".")
@everywhere begin
  # using NetCDF
  using YAXArrays
  using Statistics
  using Zarr
end

@everywhere function mymean(output, xin)
    # @show "doing a mean"
    output[:] .= mean(xin)
end

indims = InDims("time")
outdims = OutDims()

# turn out to be slower? why?
@time resultcube2 = mapCube(mymean, c; indims, outdims)

This way only the path to the respective input dataset is shared with the workers there is no duplicated memory usage.

I will think about ways to avoid this behavior in the future, to make sure no entire arrays are copied to workers, but so far I think it is best to use this approach

meggart avatar Oct 02 '23 09:10 meggart

Thanks for your quick reply and solution.

My PC has a memory of 256G. I also can't figure out why. I am using windows 10 wsl yesterday. I will try other PC and your solution to see whether this issue persists.

kongdd avatar Oct 02 '23 09:10 kongdd

My PC has a memory of 256G

Ok, I was wondering why this was running at all, because the script quickly crashed my PC. However, did you test with the modifications I sent you?

meggart avatar Oct 02 '23 12:10 meggart

Thank you @meggart . It is faster. The performance improved. But improvement of parallel model is little.

Parallel

using Distributed
addprocs(4)
# @everywhere using Pkg
# @everywhere Pkg.activate(".")
@everywhere begin
  # using NetCDF
  using YAXArrays
  using Statistics
  using Zarr
end

p = "data/temp.zarr"
c = Cube(p)[Variable = At("var1")] # see OpenNetCDF to get the file

image

Serial

using YAXArrays
using Statistics
using Zarr

p = "data/temp.zarr"
c = Cube(p)[Variable = At("var1")] # see OpenNetCDF to get the file

image

kongdd avatar Oct 02 '23 13:10 kongdd

Ok, in my tests, after compilation in the second run the parallel version was faster, but that might depend on your hard-disk properties. This example is 100% IO-limited because the actual computation is so simple. For more compute-intensive jobs or larger arrays you should see some speed-up.

meggart avatar Oct 02 '23 13:10 meggart

Thanks for your explaination.

Actually, I am trying to apply a simple and fast smoothing algorithm to global MODIS LAI data with the dimension of (86400, 43200, 500). It is a for loop over global MODIS grids (86400, 43200). Each loop costs about 0.001s.

Do you think whether my tasks suits for parallel mode? My previous test is also trying to answer this question.

kongdd avatar Oct 02 '23 13:10 kongdd