JuliaDB.jl
JuliaDB.jl copied to clipboard
Memory Leak
Using a remotecall for distributed table operations appear to keep some reference around and temporary objects are not getting GCed correctly.
@nkottary has already looked into this a little and may be able to provide additional insight. I believe the actual issue is in MemPool, but since JuliaDB is my reproducible example I'm posting the issue here.
The following example shows that repeatedly calling table operations on a distributed NDSparse (with a remote call) results in monotonically decreasing available memory. The issue goes away when you replace remotecall_fetch(do_stuff, 2, t) with do_stuff(t).
Input
using Distributed
addprocs()
@everywhere using JuliaDB
@everywhere function do_stuff(t)
println("filter...")
t1 = filter((5 => x -> x in 1:10^5), t)
println("create...")
t2 = ndsparse(keys(t), values(t))
println("join...")
t3 = join(t2, t1)
println("merge...")
t4 = merge(t2, t1)
println("groupreduce...")
groupreduce(JuliaDB.ApplyColwise(+), t2, (1,2,3,4))
println(">>>>> Free Memory: ", Sys.free_memory() / 2^20, " MiB")
end
function start()
println("starting...")
t = distribute(ndsparse(Tuple(1:10^6 for i in 1:20), Tuple(randn(10^6) for i in 1:20)), 1)
println("loaded...")
for x in 1:5
println("x $x")
remotecall_fetch(do_stuff, 2, t)
println("gc...")
@everywhere GC.gc(true)
@everywhere GC.gc(true)
end
end
start()
Output
starting...
loaded...
x 1
From worker 2: filter...
From worker 2: create...
From worker 2: join...
From worker 2: merge...
From worker 2: groupreduce...
From worker 2: >>>>> Free Memory: 2728.90625 MiB
gc...
x 2
From worker 2: filter...
From worker 2: create...
From worker 2: join...
From worker 2: merge...
From worker 2: groupreduce...
From worker 2: >>>>> Free Memory: 2254.01171875 MiB
gc...
x 3
From worker 2: filter...
From worker 2: create...
From worker 2: join...
From worker 2: merge...
From worker 2: groupreduce...
From worker 2: >>>>> Free Memory: 1380.9375 MiB
gc...
x 4
From worker 2: filter...
From worker 2: create...
From worker 2: join...
From worker 2: merge...
From worker 2: groupreduce...
From worker 2: >>>>> Free Memory: 510.90625 MiB
gc...
x 5
From worker 2: filter...
From worker 2: create...
From worker 2: join...
From worker 2: merge...
From worker 2: groupreduce...
From worker 2: >>>>> Free Memory: 17.94921875 MiB
gc...
I see that tables (not small chunk stuff but actual data) remain on the datastore anytime rechunk is used. join and merge here uses rechunk. I don't understand why because from my println debugging I see that a finalizer is registered for everything but doesn't run for some tables.
Additionally I see the issue without remotecall on julia 0.6. Don't know about 1.0.
It's easy to spot the problem in the Chunk finalizer: https://github.com/JuliaParallel/Dagger.jl/blob/master/src/chunks.jl#L41
The finalizer only removes the data if it's called from proc 1. This means creating distributed tables on other procs is a no-no.
A thing that could fix your particular example is to store an "owner proc" in the Chunk and check if myid() is this proc instead of 1 to decide to free data. But this is dangerous -- if a remotecall_fetch gets a dtable object back on to proc 1, the Chunk will go out of scope on the worker process where it was created, getting gc'd.
The proper fix would need more book-keeping on Chunks -- copying a chunk to another process should increment a ref counter on all copies of that Chunk on all processors. This can be done with a per-process refcounting dict and giving an ID to a Chunk.
Another idea, one I would prefer since it's probably the right separation of concerns, is to implement this on the DRef type in MemPool (it would have to become a mutable struct). But we can optimize something like poolget() in that case by not serializing the DRef but by simply asking for the id, something like:
https://github.com/JuliaComputing/MemPool.jl/blob/master/src/datastore.jl#L137-L139
would become:
remotecall_fetch(r.owner, r.id) do id
MMWrap(_getlocal(id, true))
end
@nkottary This might well be different from the situation in 0.6, I'd check if a similar check prevents freeing in the old free_chunks!
Actually this kind of ref-counting is implemented by RemoteChannel. It might be worth using that. If we just stick a RemoteChannel inside a DRef, and add a finalizer to the remote channel to free up data, it would do the trick!
Shashi - Thank you so much for jumping in here! Can you help us implement your last comment? .. I see DRef in mempool. Are you suggesting changing this use of remotecall_fetch to RemoteChannel in poolset. Sorry we are new to the internals.
struct DRef
owner::Int
id::Int
size::UInt
rc::RemoteChannel
function DRef(o, i, s)
rc = RemoteChannel()
d = new(o, i, s, rc)
put!(rc, d)
finalizer(x -> pooldelete(take!(x)), rc)
d
end
end
So doing this would be enough?
I think so, obviously there may be other things to look at, we can figure that out by adding a lot of tests that do weird stuff. We also need to remove the finalizer code on Chunk for this to work.
we don't need this: put!(rc, d). Just an empty RemoteChannel will do.
@cwiese I think your code is in julia 0.6? There has been a lot of changes since then to the GC code. We can try and backport a fix for the older JuliaDB.
@shashi ... @joshday has us close to upgraded to 1.0. I will get a status but I am assuming we do not have to backport anything.
Thank you @shashi and @nkottary
Okay, the RemoteChannel thing seems to clean up the memory on 1.0. Trying on 0.6...
Thanks Shashi!!! That was some really cool insight on RemoteChannel.
On 0.6 this just hangs on remotecall_fetch(do_stuff, 2, t) and cpu usage is 0%.
Okay interesting problem on 0.6, what branches of Dagger MemPool JuliaDB are you using?
@cwiese Fingers crossed, hoping that you won't be penalized by compilation costs on 1.0. I remember some of the queries you had were taking too long on first run, we still have those optimizations scattered around though.
We are hoping that the test suite can be used to "warm-up" the processor (1-n arguments per queries ran on every processor) .. maybe PackageCompiler can help some code too (not JuliaDB right?)
On 0.6, Dagger: 248b6bf5bd5de2c78a9b020d0374964e42caea3b JuliaDB: 8bf3057df154b7d0b592248bb3be98171dd28738 MemPool: 420abd370cfe5f9b3d966bfb115d82a2cc3ee152
Made there PRs Dagger: https://github.com/JuliaParallel/Dagger.jl/pull/104 MemPool: https://github.com/JuliaComputing/MemPool.jl/pull/28
As discussed on slack I've added the finalizer to the deserialized DRef here: https://github.com/JuliaParallel/Dagger.jl/pull/104
The tests for both Dagger and MemPool pass now on 1.0 and there is no memory leak. Thanks again to @shashi !
I should mention that one table is still left behind on the datastore on the remote worker.
In Josh's original example? I think we need to fix that then. So you still have the println at the end of @everything?
Yes, in the original example. I had cleaned up the debug statements but I've added them back now. Can you take a look?