restate icon indicating copy to clipboard operation
restate copied to clipboard

Add support for delayed / deferred responses

Open slinkydeveloper opened this issue 2 years ago • 1 comments

Being able to defer sending a response to a caller will allow to implement scenarios where the response is not immediately known and the callee service needs to release its lock in order to generate a response (e.g. match making). Currently, the workaround is to send an AwakeableId via a background call to the service that cannot respond immediately.

### Tasks
- [ ] https://github.com/restatedev/service-protocol/issues/49
- [ ] Propagate through the protocol the string encoded service invocation response sink
- [ ] Add a new message entry (or modify existing OutputStreamEntry message) to the protocol to send outputs using specific service invocation response sink.
- [ ] https://github.com/restatedev/sdk-java/issues/151
- [ ] https://github.com/restatedev/sdk-typescript/issues/183

slinkydeveloper avatar Nov 20 '23 12:11 slinkydeveloper

An example use case is to allow users to easily implement a "Durable Promise" primitive:

public class DurablePromise {

    // State and service Key definition omitted for brevity

    public void resolve(RestateContext ctx, String value) {
        // User can decide whether they want to allow overwriting the previously resolved value or not
        Optional<T> val = ctx.get(VALUE);
        if (val.isPresent()) {
            throw new IllegalStateException();
        }

        // User can decide if they want to broadcast the value, or just pick a random listener
        Set<String> listeners = ctx.get(LISTENERS).orElse(Collections.emptySet());
        for (String listener : listeners) {
            ctx.replyTo(listener, VALUE.serde(), value);
        }
        ctx.clear(LISTENERS);

        // User can decide to schedule a cleanup, or maybe not and just wait for manual cleanup
        // ctx.delayedCall("cleanup", TTL);
    }

    public String get(RestateContext ctx) {
        Optional<T> val = ctx.get(VALUE);
        if (val.isEmpty()) {
            ctx.update(LISTENERS, s -> {
                s.add(ctx.replyId()); // Add this caller to the list of listeners
                return s;
            }, HashSet::new);
            ctx.replyLater(); // Trap. This throws an exception and closes the current invocation.
        }

        // Value is present
        return val.get();
    }

    public void cleanup(RestateContext ctx) {
        ctx.clear(LISTENERS);
        ctx.clear(VALUE);
    }
}

slinkydeveloper avatar Jan 29 '24 09:01 slinkydeveloper