Include an example / convenience wrapper for resumable computing
There have been a number of threads in Metaflow Slack about patterns for resumable computing (see e.g. here), which is doable today with artifacts and Client API, along the lines of this example
we should document this pattern properly and potentially include a wrapper for it in the core
I'm still wrapping my head around metaflow so apologies in advance if I'm reading this wrong.
AIUI the goal of this is to make it so that on retries of a failed step you don't necessarily have to retry all of the failed step, and the way that it does that is by maintaining a per-run cache of completed intermediate results which isn't wiped when the step retries.
If that's correct, then I think this is similar to what we asked about on slack but isn't quite the same. What we are looking for is a behavior where we memoize the output of the function across runs so that successive runs do not need to perform expensive analyses multiple times (at the cost of storage, which in our case is necessary anyway). Since KFP does this by default I imagine it's a question you get fairly often, but maybe I'm just misusing metaflow and there's a better approach?
Lacking that insight I took a crack at implementing this behavior and seem to have it working with some of the expected caveats. I'm happy to share that out if there is interest, but since there seems to be a (very welcome) emphasis on ergonomics in metaflow I thought I'd start by talking about interface. Blatantly stealing your example and modifying it to my ends, it looks like this:
class MemoizingFlow(FlowSpec):
@step
def start(self):
self.list = [4, 1, 2, 3, 9, 15]
self.next(self.cached_step, foreach='list')
@memoize(next='join', inputs=['input'], outputs=['output'])
@step
def cached_step(self):
time.sleep(100)
self.output = self.input * 100
self.next(self.join)
@step
def join(self, inputs):
print(inputs)
self.next(self.end)
@step
def end(self):
print("done")
Because our use case is largely around the foreach behavior I think inputs could probably be defaulted. I could also imagine other spellings.
Under the hood this more or less just hashes the inputs, looks them up in S3, and if they are present sets the outputs to the cached values without executing the underlying function(s), then jumps to next. If not, it executes the function and stashes the outputs in S3 under a path derived from the input hash.
Some issues I know of with this approach are:
- By memoizing across runs based solely on the input you lose any kind of step-awareness, so inputs from one step may incorrectly cause another to return an incorrect value. This is trivially dealt with by including the flow/step name in the hash, but...
- Working across flows also means that the step itself may change without changing names, and therefore change the output. This should invalidate the cached entry (but does not in my current implementation).
- It also seems important that there be some kind of parameter or argument to override the cache and force regeneration.
- Finally, there should be some mechanism to specify how storage for caching occurs (eg, ttl). Right now this seems to be largely abstracted out of metaflow, so maybe philosophically this isn't the right place for that?
With regards to (1) and (2), my proposal is to hash the wrapped function(s) and their arguments as well. This does require that if your wrapped function is sensitive to changes in dependencies that you specify those dependencies in a decorator below @memoize. Same thing if you use the @kubernetes decorator and specify an image. The responsibility would be on the user to ensure that all the necessary information to preserve proper caching behavior was encoded in the wrapped functions and their arguments. A possible counterpoint to the above would be that hashing all wrapped functions will invalidate the cached entries on a change to metaflow itself and I guess I don't know how I feel about that yet not having experience with how rapidly it changes. Theoretically it makes sense; in practice it might annoy. What do you think?
With respect to the goal of resumability, I think this gets you... part of the way there? In the sense that if you move the iterator from inside the step to between steps this would give you the behavior desired there, plus the optimization we are looking for.