callbag-subscribe icon indicating copy to clipboard operation
callbag-subscribe copied to clipboard

Fix issue where Dispose() called after source terminated potentially violates spec

Open bjnsn opened this issue 4 years ago • 5 comments

Spec says: A callbag MUST NOT be delivered data after it has been terminated

As I understand it, for callbags to work in concert all callbags must either:

  1. not send 'terminate' to sources/sinks after said source/sink notifies it has terminated
  2. guard against case where callbag has notified sources/sinks of termination and is itself subsequently terminated

I expect 1) - though perhaps @staltz should weigh in...

If 2) then I believe this issue should be raised with @Andarist in callbag-remember since it doesn't guard against that case.

Example: https://stackblitz.com/edit/typescript-6w7v6u?file=index.ts

import pipe from "callbag-pipe";
import of from "callbag-of";
import subscribe from "callbag-subscribe";
import remember from "callbag-remember";

// once `of` arguments are exhausted, it sends dispose signal to its sink (remember)
const num$ = remember(of(1));

const cancelSubscription = pipe(
  num$,
  subscribe(value => console.log(value))
);

setTimeout(
  () =>
    // when subscription is canceled, it sends dispose signal to its source (remember)
    cancelSubscription(),
  100
);

// remember throws error after recieving dispose signal from both source and sink

bjnsn avatar Jan 28 '21 01:01 bjnsn

The fix here looks OK - I believe that stuff like this should be mostly handed on the ends of the callbag chains. It's the subscribe that knows that it shouldn't send 2 upwards after the source is completed (or errored).

Andarist avatar Jan 29 '21 12:01 Andarist

@Andarist It looks like handling this at the end of the chain won't fully solve the problem. In cases where remember is used with subscribe and a combination operator such as combine - the combination operator would need to terminate only those sources that haven't terminated, which isn't currently happening. Using the current patch there still are issues: https://stackblitz.com/edit/js-vqe3br

import fromPromise from "callbag-from-promise";
import interval from "callbag-interval";
import combine from "callbag-combine";
import pipe from "callbag-pipe";
import remember from "callbag-remember";

const str$ = remember(fromPromise(Promise.resolve("foo")));
const time$ = interval(1000);
const combine$ = combine(str$, time$);

const dispose = pipe(
  combine$,
  patchedSubscribe(val => {
    console.log(val);
  })
);

setTimeout(() => dispose(), 1500);

bjnsn avatar Feb 22 '21 20:02 bjnsn

Just to clarify if this wasn't clear beforehand - by "at the end" I have meant "as close to the end as possible".

I can't say that remember is 100% correct and it definitely shouldn't change but I'd only like to touch it after we fix other issues and prove that it has to be changed. This is IMHO a good process as it allows us to recognize those unhandled scenarios in other operators.

As to this particular case - I'm pretty sure that combine should "track" the completion state of its sources here: https://github.com/staltz/callbag-combine/blob/912a5ec8ec3d9e65d3beccdc7a53eabd624c1c8a/readme.js#L65 which could simply be done by nullifying the correct source talkback slot. More than that - errors from sources seem to be currently ignored by combine and that could be fixed as well.

Andarist avatar Feb 22 '21 22:02 Andarist

Right - I believe the talkback from the sink (in this case callbag-subscribe) is handled here and that the list of talkbacks is accumulated but never culled.

bjnsn avatar Feb 22 '21 22:02 bjnsn

Yep, I suspect that this should be enough to fix this particular problem:

diff --git a/readme.js b/readme.js
index 5de5d3f..d66c240 100644
--- a/readme.js
+++ b/readme.js
@@ -45,7 +45,7 @@ const combine = (...sources) => (start, sink) => {
   const sourceTalkbacks = new Array(n);
   const talkback = (t, d) => {
     if (t === 0) return;
-    for (let i = 0; i < n; i++) sourceTalkbacks[i](t, d);
+    for (let i = 0; i < n; i++) sourceTalkbacks[i] && sourceTalkbacks[i](t, d);
   };
   sources.forEach((source, i) => {
     vals[i] = EMPTY;
@@ -62,6 +62,7 @@ const combine = (...sources) => (start, sink) => {
           sink(1, arr);
         }
       } else if (t === 2) {
+        sourceTalkbacks[i] = null;
         if (--Ne === 0) sink(2);
       } else {
         sink(t, d);

Andarist avatar Feb 23 '21 09:02 Andarist