Write DArray to HDF5/JLD without first converting to Array
It would be useful to have a way to write DArrays to file without first collecting all of the data on the local process. I found a pretty hacky way of doing this that looks like this:
function write_darray{T<:AbstractFloat}(filepath::AbstractString, darr::DArray{T})
function write_localpart(pid::Int)
jldopen(filepath, "r+") do file
write(file, "inds$pid", collect(localindexes(darr)))
write(file, "arr$pid", localpart(darr))
end
end
# Write DArray metadata
jldopen(filepath, "w") do file
write(file, "dims", darr.dims)
write(file, "pids", collect(darr.pids))
end
# For each process, write the local indices and local part
for pid in darr.pids
remotecall_wait(pid, write_localpart, pid)
sleep(0.001) # This is for some reason necessary, or else HDF5 complains that the new object already exists
end
end
function read_darray(filepath::AbstractString)
file = jldopen(filepath, "r")
dims = read(file, "dims")
pids = read(file, "pids")
out = zeros(dims...)
# Reassemble local parts
for pid in pids
inds = read(file, "inds$pid")
out[inds...] = read(file, "arr$pid")
end
close(file)
return out
end
However, there must be a better way than this, especially one that would create a single variable in the file filepath (rather than 2n+2 in my case, where n is the number of processes the DArray is stored on).
This functionality would definitely be useful but it is also my impression that parallel I/O can be tricky. @amitmurthy do you have any idea how difficult it would be to get something like this working?
There is https://support.hdfgroup.org/HDF5/PHDF5/, but that is intended for a MPI environment. Parallel/distributed IO depends a lot on what filesystem you are using. As an example Lustre offers enough consistency guarantees that you can create a file on one process, open it on all processes and write to it at different offsets.
I am not familiar with HDF5 and its capabilities (specifically with distributing a single variable write across node), however, in a generic sense and using spmd pseudo code should be something like:
@everywhere function darr_local_write(d::DArray, file, in_parallel, offsets)
pids = sort(vec(procs(d)))
pididx = findfirst(pids, myid())
if in_parallel
open file
seek to offset
write local part and close file
else
if pididx != 1
@assert recvfrom(pids[pididx-1]) == :DONE
end
open file in append mode, write local part, close file
pididx < length(pids) && sendto(pids[pididx+1], :DONE)
end
end
function darr_write(d::DArray, file, in_parallel)
open file
write header (pidindexes, dims, dimdist)
if in_parallel
offsets = calculate offsets
else
offsets=[]
end
spmd(darr_local_write, d, file, in_parallel, offsets;pids=workers())
end
# Reading a darray back
@everywhere darr_local_read(refs::DArray, file, in_parallel, offsets, initial_offset)
if in_parallel
open file
seek to offset
local_part = read local part
close file
else
if pididx != 1
initial_offset = recvfrom(pids[pididx-1])
end
open file, seek(initial_offset)
local_part = read local part
new_offset = offset of file
close file
pididx < length(pids) && sendto(pids[pididx+1], new_offset)
end
put!(refs[:L], local_part)
end
function darr_read(d::DArray, file, in_parallel)
open file
read header (pidindexes, dims, dimdist)
if in_parallel
offsets = read offsets
else
offsets=[]
end
initial_offset = offset of file
localparts_refs = ddata()
spmd(darr_local_read, localparts_refs, file, in_parallel, offsets, initial_offset;pids=workers())
assert length(pidindexes) == nworkers()
DArray(I->ddata[:L], dims, dimdist)
end
This feature should be an interesting addition. However, I think it should be in a different package so that we do not have dependencies on HDF5.jl here.