xstream icon indicating copy to clipboard operation
xstream copied to clipboard

Out of order events with imitate

Open lolgesten opened this issue 6 years ago • 10 comments

I'm observing out of order events. First some pseudo to explain what I got.

function main(sources) {
  // updates to merged app state on the form `{key: value}`
  const update$ = xs.create();

  // folded app state
  const state$ = update$
    .fold((state, update) => ({...state, ...update}), {})
    .debug('state');

  // various inputs that produces updates$. 
  // one also evaluates state$ to updates. cyclic, but should be ok.
  ...

  // proxy that follows stream$, but sees a different order!!!
  const proxy$ = xs.from(stream$).debug('proxy');

  const realUpdate$ = xs.merge(
    ...
  ).debug('update');

  updates$.imitate(realUpdate$); // cyclic!

  return { ... };
}

My real scenario is more complicated with lots of state evaluation, so it's entirely possible that the problem is introduced somewhere else.

I observe the correct state order on state, but the proxy sees another.

screen shot 2018-02-11 at 20 50 49
  • 4th row, update with config merged into state.
  • 5th row new state with merged in config
  • 6th row evaluating the state$ we spot state IDLE and config, produce update INITED
  • 7th row new state merged in new appState
  • 8th - 9th row. proxy sees the events out of order. :(

lolgesten avatar Feb 11 '18 20:02 lolgesten

This is run with cycle.js, if that makes any difference.

lolgesten avatar Feb 11 '18 20:02 lolgesten

Hi! Thanks for reporting. Just to be sure, which version of xstream and cycle/run?

staltz avatar Feb 11 '18 20:02 staltz

    "@cycle/run": "^4.1.0",
    "xstream": "^11.2.0"

Also. I just found a workaround by using .compose(delay(1)) on the update stream that evaluates the app state.

lolgesten avatar Feb 11 '18 20:02 lolgesten

const xs = require('xstream').default;

const START_STATE = {
  ver: 0,   // counts up to see out of order
  a: false, // we toggle this on a timer
  b: false, // we toggle this on a different timer
  c: false, // we derive this from a and b
};

function loop() {

  // imitate cycle
  const update$ = xs.create();

  // the folded state, with ever increasing version number
  const state$ = update$.
        fold((p, c) => ({...p, ver: p.ver + 1, ...c}), START_STATE)
        .debug('state');

  // the timer updates
  const aUpdate$ = xs.periodic(500).map(n => ({a: n % 2 === 0}));
  const bUpdate$ = xs.periodic(100).map(n => ({b: n % 3 === 0}));

  // derived update from a/b
  const derivedUpdate$ = state$
        .filter(state => state.c !== state.a && state.b)
        .map(state => ({c: state.a && state.b}));

  // the proxy that sees the error
  const proxy$ = xs.from(state$).debug('proxy');

  // merge the update streams
  const realUpdate$ = xs.merge(
    aUpdate$,
    bUpdate$,
    derivedUpdate$
  );

  // and cycle back to top
  update$.imitate(realUpdate$);

  return {
    state$,
    proxy$,
  };

}

const {state$, proxy$} = loop();
state$.addListener({listener: next => {}});
proxy$.addListener({listener: next => {}});

@staltz here's a test case that reproduces the problem every time for me. It might be a bit contrived, i.e. there may be a more succinct way to construct a test case.

I see the problem in the proxy at ver: 6

state: { ver: 5, a: true, b: true, c: false }
state: { ver: 6, a: true, b: true, c: true }
proxy: { ver: 6, a: true, b: true, c: true }
proxy: { ver: 5, a: true, b: true, c: false }

lolgesten avatar Feb 16 '18 21:02 lolgesten

Wrote it as a test case too. https://gist.github.com/lolgesten/318dd558786d6da3d56338c123bdf3c7

lolgesten avatar Feb 16 '18 22:02 lolgesten

Looking at xstream code I think I got it wrong that this has to do with observables.

x.from() short circuits if it detects a Stream and returns the original stream instead.

I think that means we get two InternalListener on the same stream, and if the first listener causes an update to the stream itself, it will also see the update before the second.

lolgesten avatar Feb 16 '18 22:02 lolgesten

Maybe this isn't a bug. It's just I need to be aware of that shortcut. With that in mind, the fix is:

  // merge the update streams
  const realUpdate$ = xs.merge(
    aUpdate$,
    bUpdate$,
    derivedUpdate$
  ).compose(delay(1)); // delay update to allow all observers of state$ to see value

lolgesten avatar Feb 16 '18 22:02 lolgesten

@lolgesten A delay will fix it, but the issue is still real. I think imitate should do a buffer for synchronous emissions, like we have inside Cycle run. Hence, reopening this issue.

staltz avatar Mar 20 '18 08:03 staltz

Ah ok! I agree that it'd be nicer if I didn't have to lose the synchronicity (is that a word? :) )

lolgesten avatar Mar 20 '18 14:03 lolgesten

Here's the smallest possible repro I've come up with.

const imitNumber$ = xs.create<number>();

const numberAdd$ = imitNumber$.filter((n) => n < 100).map((n) => n + 100);

const source$ = xs.create<number>();

const realNumber$ = xs.merge(source$, numberAdd$);

imitNumber$.imitate(realNumber$);

realNumber$.subscribe({next: (n) => console.log('real', n)});
imitNumber$.subscribe({next: (n) => console.log('imit', n)});

source$.shamefullySendNext(1);

The output is:

real 1
real 101
imit 101
imit 1

So what we see is that the imit subscriber is observing 101 before 1, even though 1 clearly has been evaluated since it's the cause of 101.

The problem occurs because numberAdd$ is synchronously evaluated.

Our fix to this problem is to patch imitate. Arguably the patch should be done for error as well, but we don't use errors really, so this is enough for us.

The important bit is that even if target$.shamefullySendNext(next!) causes a recursive invocation of next, we must not update the reactive tree with that value until later.

export const patchImitate = () => {
    Stream.prototype.imitate = function (source$: any) {
        return imitate(source$, this);
    };
};

const imitate = <T>(source$: Stream<T>, target$: Stream<T>) => {
    const queue: T[] = [];
    let running = false;

    source$.subscribe({
        next: (v) => {
            queue.push(v);

            if (running) {
                return;
            }

            running = true;

            while (queue.length) {
                const next = queue.shift();
                target$.shamefullySendNext(next!);
            }

            running = false;
        },
        error: (e) => {
            target$.shamefullySendError(e);
        },
        complete: () => {
            target$.shamefullySendComplete();
        },
    });
};

lolgesten avatar Aug 13 '20 16:08 lolgesten