sdk-typescript
sdk-typescript copied to clipboard
[Feature Request] async generator for cancellationScope
I am using nested CancellationScopes to implement update logic and I want to be able to inject results from the previous iteration, where it was at that point. Mind that the internal cancellationScope could have a long duration.
For this I want to use an async generator, see snippet underneath.
I have 2 cancellationScopes, 1 for the update of the item and 1 for an update in the cancellableComputeFn. On each updateItemSignal I want to run cancellableComputeFn. On each updateSignal I want to do a calculation.
At the end of the cancellableComputeFn I have a long sleep to wait for possible updates. On an update I don't want to lose progress of my itemRes. Mind that in reality this is split over a class so I can't use (variables known over the main() fn).
The reason to split it is to reuse the more generic item updating (and hide that from the specific computeFn).
async function main() {
let updateScopeFn = new CancellationScope({ cancellable: true });
let updateScopeItem = new CancellationScope({ cancellable: true });
// In reality this handler is part of computationFn.
setHandler(updateItemSignal, () => {
updateScopeItem.cancel();
updateScopeItem = new CancellationScope({ cancellable: true });
updateScopeFn = new CancellationScope({ cancellable: true, parent: updateScopeItem });
}
setHandler(updateSignal, () => {
updateScopeFn.cancel();
updateScopeFn = new CancellationScope({ cancellable: true, parent: updateScopeFn.parent });
}
const cancellableComputeFn = async function* (itemRes) {
let state = 'processing';
let internalCount = itemRes.internalCount;
while( state !== 'completed') {
try {
yield* updateScopeFn.run(async () => {
internalCount += 1;
yield {internalCount}; // Let
await sleep('1d');
state = 'completed';
}
} catch(err) {
// Catch cancellation error.
}
}
return {internalCount};
}
// In my actual code this is a class such that I can place the updateScopeItem in this fn.
const computationFn = async function () {
let itemRes = {internalCount: 0};
let state = 'processing';
while( state !== 'completed') {
// Do some other processing.
try {
await updateScopeItem.run(async () => {
const computeFn = cancellableComputeFn(itemRes);
while (true) {
const { value, done } = await computeFn.next();
if (!done) {
itemRes = value;
}
if (done) {
return value;
}
}
});
state = 'completed';
} catch(err) {
// Catch cancellation error.
}
}
}
await computationFn();
}
Maybe you use other patterns to achieve this?
I'd write this workflow as shown below. Does this look better to you?
export async function myWorkflow() {
let internalCount = 0;
for (;;) {
try {
await CancellationScope.cancellable(async () => {
const scope = CancellationScope.current();
setHandler(updateSignal, () => scope.cancel());
for (;;) {
try {
await CancellationScope.cancellable(async () => {
const scope = CancellationScope.current();
setHandler(updateItemSignal, () => scope.cancel());
internalCount++;
await sleep('1d');
});
break;
} catch (err) {
// Catch cancellation error.
}
}
});
break;
} catch (err) {
// Catch cancellation error.
}
}
}
Or with a generator:
export async function myWorkflow() {
async function* compute(internalCount = 0) {
for (;;) {
try {
yield ++internalCount;
await CancellationScope.cancellable(async () => {
const scope = CancellationScope.current();
setHandler(updateItemSignal, () => scope.cancel());
await sleep('1d');
});
break;
} catch (err) {
// Catch cancellation error.
}
}
}
for (;;) {
let internalCount = 0;
try {
await CancellationScope.cancellable(async () => {
const scope = CancellationScope.current();
setHandler(updateSignal, () => scope.cancel());
for await (const c of compute(internalCount)) {
internalCount = c;
}
});
break;
} catch (err) {
// Catch cancellation error.
}
}
}
I think technically you can implement async generator support for cancellation scopes with something like this:
import { CancellationScope } from '@temporalio/workflow';
import { storage } from '@temporalio/workflow/lib/cancellation-scope';
export class MyCancellationScope extends CancellationScope {
iter<I, F extends AsyncGenerator<I>>(f: () => F): AsyncGenerator<I> {
return storage.run(this, f);
}
}
storage is an internal API so I wouldn't recommend using it but if this solution works for you, I'd accept a contribution to the SDK to add the feature (with some proper tests).
Thanks for the quick response 🚀 There could be some nesting involved in the cancellationScopes in computeFn as well and some computations might be skipped dependent on the received signal, so moving out of the scope to do the yield isn't really an option.
So the example would be more like this.
async function* compute(internalCount = 0) {
for (;;) {
try {
await CancellationScope.cancellable(async () => {
const scope = CancellationScope.current();
setHandler(updateSignal2, () => scope.cancel());
// Do some other calculations.
internalCount = 2*internalCount;
yield internalCount
await CancellationScope.cancellable(async () => {
const scope = CancellationScope.current();
setHandler(updateSignalItem, () => scope.cancel());
yield ++internalCount;
await sleep('1d');
});
});
break;
} catch (err) {
// Catch cancellation error.
}
}
}
I will tryout the last suggestion.
Seems like that doesn't work. When you try to cancel the scope the iter method does not terminate. how is this cancellation injected in the current run method?
You should read the implementation of CancellationScope, it's fairly straightforward: https://github.com/temporalio/sdk-typescript/blob/main/packages/workflow/src/cancellation-scope.ts.
The heavy lifting is done by node's AsyncLocalStorage.
You might want to create scopes around each "sleep" or other Promises awaited in your generator to have cancellable operations but I understand that it's not as elegant as having CancellationScope do this automatically for you.
You can create a helper as show here:
async function cancellableWithSignal(signalDef, fn) {
return await CancellationScope.cancellable(async () => {
const scope = CancellationScope.current();
setHandler(signalDef, () => scope.cancel());
await fn();
});
}
async function* compute(internalCount = 0) {
for (;;) {
try {
yield ++internalCount;
await cancellableWithSignal(updateItemSignal, async () => {
await sleep('1d');
});
break;
} catch (err) {
// Catch cancellation error.
}
}
}
I can see why wrapping the generator with storage.run might not work properly since when you yield you're effectively leaving the scope.
@Irvenae I'm curious what you ended up doing with this.
Worked around it for now by going out of the scope to do the yield ending up in ugly code...