DistributedArrays.jl icon indicating copy to clipboard operation
DistributedArrays.jl copied to clipboard

Write DArray to HDF5/JLD without first converting to Array

Open pearlzli opened this issue 7 years ago • 3 comments

It would be useful to have a way to write DArrays to file without first collecting all of the data on the local process. I found a pretty hacky way of doing this that looks like this:

function write_darray{T<:AbstractFloat}(filepath::AbstractString, darr::DArray{T})
    function write_localpart(pid::Int)
        jldopen(filepath, "r+") do file
            write(file, "inds$pid", collect(localindexes(darr)))
            write(file, "arr$pid", localpart(darr))
        end
    end

    # Write DArray metadata
    jldopen(filepath, "w") do file
        write(file, "dims", darr.dims)
        write(file, "pids", collect(darr.pids))
    end

    # For each process, write the local indices and local part
    for pid in darr.pids
        remotecall_wait(pid, write_localpart, pid)
        sleep(0.001) # This is for some reason necessary, or else HDF5 complains that the new object already exists
    end
end

function read_darray(filepath::AbstractString)
    file = jldopen(filepath, "r")
    dims = read(file, "dims")
    pids = read(file, "pids")
    out = zeros(dims...)

    # Reassemble local parts
    for pid in pids
        inds = read(file, "inds$pid")
        out[inds...] = read(file, "arr$pid")
    end

    close(file)
    return out
end

However, there must be a better way than this, especially one that would create a single variable in the file filepath (rather than 2n+2 in my case, where n is the number of processes the DArray is stored on).

pearlzli avatar Oct 04 '16 19:10 pearlzli

This functionality would definitely be useful but it is also my impression that parallel I/O can be tricky. @amitmurthy do you have any idea how difficult it would be to get something like this working?

andreasnoack avatar Feb 01 '17 03:02 andreasnoack

There is https://support.hdfgroup.org/HDF5/PHDF5/, but that is intended for a MPI environment. Parallel/distributed IO depends a lot on what filesystem you are using. As an example Lustre offers enough consistency guarantees that you can create a file on one process, open it on all processes and write to it at different offsets.

vchuravy avatar Feb 01 '17 03:02 vchuravy

I am not familiar with HDF5 and its capabilities (specifically with distributing a single variable write across node), however, in a generic sense and using spmd pseudo code should be something like:

@everywhere function darr_local_write(d::DArray, file, in_parallel, offsets)
    pids = sort(vec(procs(d)))
    pididx = findfirst(pids, myid())

    if in_parallel
        open file
        seek to offset
        write local part and close file
    else
        if pididx != 1
            @assert recvfrom(pids[pididx-1]) == :DONE
        end
        open file in append mode, write local part, close file
        pididx < length(pids) && sendto(pids[pididx+1], :DONE)
    end
end

function darr_write(d::DArray, file, in_parallel)
    open file
    write header (pidindexes, dims, dimdist)
    if in_parallel
        offsets = calculate offsets
    else
        offsets=[]
    end
    spmd(darr_local_write, d, file, in_parallel, offsets;pids=workers())
end

# Reading a darray back
@everywhere darr_local_read(refs::DArray, file, in_parallel, offsets, initial_offset)
    if in_parallel
        open file
        seek to offset
        local_part = read local part
        close file
    else
        if pididx != 1
            initial_offset = recvfrom(pids[pididx-1])
        end
        open file, seek(initial_offset)
        local_part = read local part
        new_offset = offset of file
        close file
        pididx < length(pids) && sendto(pids[pididx+1], new_offset)
    end
    put!(refs[:L], local_part)
end


function darr_read(d::DArray, file, in_parallel)
    open file
    read header (pidindexes, dims, dimdist)
    if in_parallel
        offsets = read offsets
    else
        offsets=[]
    end
    initial_offset = offset of file

    localparts_refs = ddata()
    spmd(darr_local_read, localparts_refs, file, in_parallel, offsets, initial_offset;pids=workers())

    assert length(pidindexes) == nworkers()

    DArray(I->ddata[:L], dims, dimdist)
end

This feature should be an interesting addition. However, I think it should be in a different package so that we do not have dependencies on HDF5.jl here.

amitmurthy avatar Feb 01 '17 07:02 amitmurthy