xstream
xstream copied to clipboard
Out of order events with imitate
I'm observing out of order events. First some pseudo to explain what I got.
function main(sources) {
// updates to merged app state on the form `{key: value}`
const update$ = xs.create();
// folded app state
const state$ = update$
.fold((state, update) => ({...state, ...update}), {})
.debug('state');
// various inputs that produces updates$.
// one also evaluates state$ to updates. cyclic, but should be ok.
...
// proxy that follows stream$, but sees a different order!!!
const proxy$ = xs.from(stream$).debug('proxy');
const realUpdate$ = xs.merge(
...
).debug('update');
updates$.imitate(realUpdate$); // cyclic!
return { ... };
}
My real scenario is more complicated with lots of state evaluation, so it's entirely possible that the problem is introduced somewhere else.
I observe the correct state order on state
, but the proxy
sees another.
![screen shot 2018-02-11 at 20 50 49](https://user-images.githubusercontent.com/30282827/36077762-0aa38d16-0f6f-11e8-8197-402b2615fdb0.png)
- 4th row, update with
config
merged into state. - 5th row new state with merged in
config
- 6th row evaluating the
state$
we spot stateIDLE
andconfig
, produce updateINITED
- 7th row new state merged in new
appState
- 8th - 9th row.
proxy
sees the events out of order. :(
This is run with cycle.js, if that makes any difference.
Hi! Thanks for reporting. Just to be sure, which version of xstream and cycle/run?
"@cycle/run": "^4.1.0",
"xstream": "^11.2.0"
Also. I just found a workaround by using .compose(delay(1))
on the update stream that evaluates the app state.
const xs = require('xstream').default;
const START_STATE = {
ver: 0, // counts up to see out of order
a: false, // we toggle this on a timer
b: false, // we toggle this on a different timer
c: false, // we derive this from a and b
};
function loop() {
// imitate cycle
const update$ = xs.create();
// the folded state, with ever increasing version number
const state$ = update$.
fold((p, c) => ({...p, ver: p.ver + 1, ...c}), START_STATE)
.debug('state');
// the timer updates
const aUpdate$ = xs.periodic(500).map(n => ({a: n % 2 === 0}));
const bUpdate$ = xs.periodic(100).map(n => ({b: n % 3 === 0}));
// derived update from a/b
const derivedUpdate$ = state$
.filter(state => state.c !== state.a && state.b)
.map(state => ({c: state.a && state.b}));
// the proxy that sees the error
const proxy$ = xs.from(state$).debug('proxy');
// merge the update streams
const realUpdate$ = xs.merge(
aUpdate$,
bUpdate$,
derivedUpdate$
);
// and cycle back to top
update$.imitate(realUpdate$);
return {
state$,
proxy$,
};
}
const {state$, proxy$} = loop();
state$.addListener({listener: next => {}});
proxy$.addListener({listener: next => {}});
@staltz here's a test case that reproduces the problem every time for me. It might be a bit contrived, i.e. there may be a more succinct way to construct a test case.
I see the problem in the proxy at ver: 6
state: { ver: 5, a: true, b: true, c: false }
state: { ver: 6, a: true, b: true, c: true }
proxy: { ver: 6, a: true, b: true, c: true }
proxy: { ver: 5, a: true, b: true, c: false }
Wrote it as a test case too. https://gist.github.com/lolgesten/318dd558786d6da3d56338c123bdf3c7
Looking at xstream code I think I got it wrong that this has to do with observables.
x.from()
short circuits if it detects a Stream
and returns the original stream instead.
I think that means we get two InternalListener
on the same stream, and if the first listener causes an update to the stream itself, it will also see the update before the second.
Maybe this isn't a bug. It's just I need to be aware of that shortcut. With that in mind, the fix is:
// merge the update streams
const realUpdate$ = xs.merge(
aUpdate$,
bUpdate$,
derivedUpdate$
).compose(delay(1)); // delay update to allow all observers of state$ to see value
@lolgesten A delay will fix it, but the issue is still real. I think imitate
should do a buffer for synchronous emissions, like we have inside Cycle run. Hence, reopening this issue.
Ah ok! I agree that it'd be nicer if I didn't have to lose the synchronicity (is that a word? :) )
Here's the smallest possible repro I've come up with.
const imitNumber$ = xs.create<number>();
const numberAdd$ = imitNumber$.filter((n) => n < 100).map((n) => n + 100);
const source$ = xs.create<number>();
const realNumber$ = xs.merge(source$, numberAdd$);
imitNumber$.imitate(realNumber$);
realNumber$.subscribe({next: (n) => console.log('real', n)});
imitNumber$.subscribe({next: (n) => console.log('imit', n)});
source$.shamefullySendNext(1);
The output is:
real 1
real 101
imit 101
imit 1
So what we see is that the imit
subscriber is observing 101
before 1
, even though 1
clearly has been evaluated since it's the cause of 101
.
The problem occurs because numberAdd$
is synchronously evaluated.
Our fix to this problem is to patch imitate
. Arguably the patch should be done for error
as well, but we don't use errors really, so this is enough for us.
The important bit is that even if target$.shamefullySendNext(next!)
causes a recursive invocation of next
, we must not update the reactive tree with that value until later.
export const patchImitate = () => {
Stream.prototype.imitate = function (source$: any) {
return imitate(source$, this);
};
};
const imitate = <T>(source$: Stream<T>, target$: Stream<T>) => {
const queue: T[] = [];
let running = false;
source$.subscribe({
next: (v) => {
queue.push(v);
if (running) {
return;
}
running = true;
while (queue.length) {
const next = queue.shift();
target$.shamefullySendNext(next!);
}
running = false;
},
error: (e) => {
target$.shamefullySendError(e);
},
complete: () => {
target$.shamefullySendComplete();
},
});
};