Dagger.jl
Dagger.jl copied to clipboard
Improve checkpointing
- Logging and rendering
- Cleanup actions
For checkpointing to work with the eager API, we'll probably need to start calculating hashes of each task's function and arguments (and recursively hashing any upstream tasks, while weakly caching them to amortize costs), and use that instead of thunk ID, which isn't always reproducible. We'll also need to keep a hash of each chunk; hashing by the original DRef's objectid will probably be sufficient.
Additionally, we should probably also setup TLS variables for the associated thunk when calling thunk checkpoint/restore. This would let us access the upcoming storage option, so that the callback can make use of it.
Hash implementation added in #356
Hi, @jpsamaroo! A long while back I asked on Discourse about Luigi for Julia. You kindly highlighted how Dagger can fill that role. One of the essential aspects of Luigi is its system for "Targets" as checkpoints.
Tl;Dr Targets are a class that implement a method "exists" that returns a bool. If the target for a Task returns true for exists, the task is skipped, and in either case, only targets are passed from dependencies to upstream tasks, not the data itself. It is the responsibility of the upstream tasks to know how to retrieve the data.
This is obviously much less elegant and ergonomic than Dagger's approach, but it has two benefits that I don't know how to implement in the Dagger.
- Using external targets like SQL tables
- Checkpointing results that are out of core. For example, processing many chunks of a table, writing them to a tmp directory as parquet files, and then renaming the directory to the target name to signal it is complete.
I can see several situations here. Maybe I'm misunderstanding Dagger, and it already supports this kind of workflow (in which case, I'd love to contribute to the docs so they can be clearer for folks like me). Maybe I am fundamentally misunderstanding how Dagger should be approached, this is an XY problem, and there are different approaches that can help with my greater goals. Or maybe these are missing features that I could try to help implement.
I'd appreciate your comments on the best way forward!
Follow up on this: looks like there are only docs for using checkpoints are only written for delayed and compute which area listed as "legacy API" to be avoided on the main page of the docs.
So two questions here:
- How do you add checkpoint options to spawn? Just the same as delayed, I'm assuming?
@spawnreturn an EagerThunk which requiresfetch, but the docs show usingtochunkwhich needscollect. If you're using the@spawnAPI, and you want to checkpoint the result, what's the best way to define therestorestep so that the receiver knows what it's getting. Is the best practice that spawned tasks should always return a chunk as well?
Edit: After poking around a bit, it seems like you can't use checkpointing with spawn using the documented method. I slightly edited the example you have in the docs for simplicity
julia> x = Dagger.spawn(sum;
options=Dagger.Sch.ThunkOptions(;
checkpoint=(thunk,result)->begin
open("checkpoint.bin", "w") do io
serialize(io, collect(result))
end
end,
restore=(thunk)->begin
open("checkpoint.bin", "r") do io
Dagger.tochunk(deserialize(io))
end
end)
)([5,6])
# ERROR: MethodError: no method matching keys(::Dagger.Sch.ThunkOptions)
Replacing spawn with delayed makes this code work.
Hi @mrufsvold ! From your description, it sounds like Luigi's Targets are basically the same idea as Dagger's checkpointing, with the exception that we don't need to expect the recipient of the checkpointed data to know how to access it; Dagger automatically unwraps it and passes it to the recipient, as if it was never checkpointed to disk to start with. That's the idea, at least :slightly_smiling_face:
Sorry about the lack of docs! We really could use some good examples of how it can be used in practice. Regarding your questions, your example code seems like it should be the right thing to do (although it should be Dagger.spawn(sum, [5,6]; ...), but you should also be able to do:
x = Dagger.@spawn checkpoint=(thunk,result)->... restore=(thunk)->... sum([5, 6])
(I personally hate the ThunkOptions thing, but I haven't yet found a nice way to replace it; in the meantime, @spawn should be smart enough to construct one for you).
For Dagger.spawn, you can also remove the options=Dagger.Sch.ThunkOptions(;...) and just pass checkpoint and restore directly to Dagger.spawn. There is an issue that this will throw a MethodError because the checkpoint and restore functions are too new; I've filed https://github.com/JuliaParallel/Dagger.jl/pull/371 as a fix for this.
Anyway, the logic you're using in your checkpoint and restore functions is correct; the Dagger.tochunk call basically just constructs a Dagger.Chunk around the restored data, which allows Dagger to pass that result around to other workers and such.
I have some planned enhancements to checkpointing which will:
- Allow just specifying a path to the checkpoint file (and some additional data if you want to serialize through a different format), and Dagger will then automatically checkpoint and restore through that file
- Some kind of auto-checkpointing scheme where Dagger will use the above-mentioned hashing system to generate a unique filename and automatically checkpoint/restore through that file.