callbag-subscribe
callbag-subscribe copied to clipboard
Fix issue where Dispose() called after source terminated potentially violates spec
Spec says: A callbag MUST NOT be delivered data after it has been terminated
As I understand it, for callbags to work in concert all callbags must either:
- not send 'terminate' to sources/sinks after said source/sink notifies it has terminated
- guard against case where callbag has notified sources/sinks of termination and is itself subsequently terminated
I expect 1) - though perhaps @staltz should weigh in...
If 2) then I believe this issue should be raised with @Andarist in callbag-remember since it doesn't guard against that case.
Example: https://stackblitz.com/edit/typescript-6w7v6u?file=index.ts
import pipe from "callbag-pipe";
import of from "callbag-of";
import subscribe from "callbag-subscribe";
import remember from "callbag-remember";
// once `of` arguments are exhausted, it sends dispose signal to its sink (remember)
const num$ = remember(of(1));
const cancelSubscription = pipe(
num$,
subscribe(value => console.log(value))
);
setTimeout(
() =>
// when subscription is canceled, it sends dispose signal to its source (remember)
cancelSubscription(),
100
);
// remember throws error after recieving dispose signal from both source and sink
The fix here looks OK - I believe that stuff like this should be mostly handed on the ends of the callbag chains. It's the subscribe
that knows that it shouldn't send 2
upwards after the source is completed (or errored).
@Andarist It looks like handling this at the end of the chain won't fully solve the problem. In cases where remember
is used with subscribe
and a combination operator such as combine
- the combination operator would need to terminate only those sources that haven't terminated, which isn't currently happening. Using the current patch there still are issues: https://stackblitz.com/edit/js-vqe3br
import fromPromise from "callbag-from-promise";
import interval from "callbag-interval";
import combine from "callbag-combine";
import pipe from "callbag-pipe";
import remember from "callbag-remember";
const str$ = remember(fromPromise(Promise.resolve("foo")));
const time$ = interval(1000);
const combine$ = combine(str$, time$);
const dispose = pipe(
combine$,
patchedSubscribe(val => {
console.log(val);
})
);
setTimeout(() => dispose(), 1500);
Just to clarify if this wasn't clear beforehand - by "at the end" I have meant "as close to the end as possible".
I can't say that remember
is 100% correct and it definitely shouldn't change but I'd only like to touch it after we fix other issues and prove that it has to be changed. This is IMHO a good process as it allows us to recognize those unhandled scenarios in other operators.
As to this particular case - I'm pretty sure that combine
should "track" the completion state of its sources here:
https://github.com/staltz/callbag-combine/blob/912a5ec8ec3d9e65d3beccdc7a53eabd624c1c8a/readme.js#L65
which could simply be done by nullifying the correct source talkback slot. More than that - errors from sources seem to be currently ignored by combine
and that could be fixed as well.
Right - I believe the talkback from the sink (in this case callbag-subscribe
) is handled here and that the list of talkbacks is accumulated but never culled.
Yep, I suspect that this should be enough to fix this particular problem:
diff --git a/readme.js b/readme.js
index 5de5d3f..d66c240 100644
--- a/readme.js
+++ b/readme.js
@@ -45,7 +45,7 @@ const combine = (...sources) => (start, sink) => {
const sourceTalkbacks = new Array(n);
const talkback = (t, d) => {
if (t === 0) return;
- for (let i = 0; i < n; i++) sourceTalkbacks[i](t, d);
+ for (let i = 0; i < n; i++) sourceTalkbacks[i] && sourceTalkbacks[i](t, d);
};
sources.forEach((source, i) => {
vals[i] = EMPTY;
@@ -62,6 +62,7 @@ const combine = (...sources) => (start, sink) => {
sink(1, arr);
}
} else if (t === 2) {
+ sourceTalkbacks[i] = null;
if (--Ne === 0) sink(2);
} else {
sink(t, d);