xstream icon indicating copy to clipboard operation
xstream copied to clipboard

Non obvious behavior with `concat`, `Promise` and `EventEmitter`.

Open kaukas opened this issue 9 years ago • 9 comments

Hello.

Using xstream on a new project is my only experience with reactive programming so far. Therefore please excuse any obvious problems or mistakes.

I encountered a strange problem with this snippet:

concat(xs.fromPromise(somePromise), fromEvent(someEvent, 'test'));

My expectation was that concat would buffer the test events until the promise resolves (kind of like flattenSequentially. But the events are dropped altogether. I wrote two tests to expose this problem: https://github.com/kaukas/xstream/commit/6e693896c72fda4143d607e2b3996797fb822d62. Could you please let me know if this is expected behavior?

Thank you!

kaukas avatar Feb 06 '17 11:02 kaukas

Hi @kaukas. Thanks for the report. At first look, it seems like a bug. But I'll investigate better later, unless someone else wants to help with it.

staltz avatar Feb 08 '17 10:02 staltz

Hello, @staltz.

I've been playing further with this. I am not sure how concat should behave but flattenSequentially is definitely not doing the right thing. Even in the case where the event is sent after the promise resolves the event is lost.

I suspect the reason for the lost events is that the fromEvents stream is only activated after the previous stream (promise) completes. Therefore all prior events are lost. Instead the fromEvents stream should be activated immediately on flattenSequentially call so that all subsequent events could be buffered. At least this is my opinion.

Thank you!

kaukas avatar Feb 08 '17 10:02 kaukas

I encountered this issue while looking for something else, but to me it sounds exactly as I know concat to behave and how it's documented :thinking:

rjkip avatar Feb 20 '17 17:02 rjkip

@rjkip, I agree that concat probably behaves correctly. I have doubts about flattenSequentially but that could be a different issue. Should we rename this issue? Should we close it?

kaukas avatar Feb 21 '17 09:02 kaukas

@kaukas I'm sorry, I should also have looked at the tests you wrote. Maybe you already found things out yourself, but there were two issues which I addressed here. One is that the event emitter stream never completes, so the tests timed out. Secondly, there appeared to be some emit order issues, so I added explicit delays to the setTimeout calls. This is how I understand concat to work, and the tests pass. Does this at least answer your concat questions?

What are your concerns about flattenSequentially? It (conceptually) subscribes to all streams it receives, buffering results of streams 1..n while stream 0 is still emitting. It then emits the buffered values from stream 1 and any following values, etc. etc. I think this test demonstrates this most concisely: https://github.com/staltz/xstream/blob/3996d5e757eb3770e3fd2a6c8e42f583107423b3/tests/extra/flattenSequentially.ts#L46-L53.


Edit: added missing link to adjusted tests.

rjkip avatar Feb 21 '17 11:02 rjkip

@rjkip, here's what I want to do. I want to have a stream that returns events from an EventEmitter. However, it should .remember() the last event, even if the app is restarted. Therefore I want to prepend a stream from a Promise which loads the last seen value (e.g. from a DB) so that there's always a value present:

-- Promise.resolve ---
                      -- EventEmitter.on ---

However, in some cases the EventEmitter emits a value before the Promise resolves. It is important for me not to lose that early event.

                      -- Promise.resolve ---
-- EventEmitter.on ---

I expect it to be buffered and emitted right after the Promise. Apparently, neither concat nor flattenSequentially work that way. My guess is that the fromEvent stream is not even activated until the Promise resolves. That's how the early event is lost.

I find it a bit hard to explain the (admittedly convoluted) setup and expectations. Please let me know if I should try more. I also wanted to create tests to demonstrate this behavior. The two tests never complete because the EventEmitter event is lost, even if it fires after the promise! However, I admit that both concat and flattenSequentially possibly behave correctly and it's just me misunderstanding the contract.

kaukas avatar Mar 02 '17 15:03 kaukas

@kaukas you look for this kind of concat, right?

concat (from https://medium.com/@jshvarts/read-marble-diagrams-like-a-pro-3d72934d3ef5)

I need this functionality too to concatenate an event history with updates where the event history query might resolve after the first update

webmaster128 avatar Jan 03 '19 13:01 webmaster128

@webmaster128, it was a long time ago, I do not remember any more. I guess you're right, concat would work. The important thing is the time of subscription: concat should subscribe to both streams upon creation. Contrast that to flattenSequentially which (at the time of reporting, at least) did not subscribe to the second stream until the first finished.

kaukas avatar Jan 03 '19 13:01 kaukas

I'm implementing a version of buffering concat right now for my project.

This is a test case that shows the difference between the two versions of concat:

  it("buffers asynchonous events of second stream until first stream completes", done => {
    const sourceStream = Stream.periodic(25);
    const stream1 = sourceStream.take(3);
    const stream2 = sourceStream.take(3);
    const concatenatedStream = concat(stream1, stream2);
    const expected = [0, 1, 2, 0, 1, 2];

    concatenatedStream.addListener({
      next: value => expect(value).toEqual(expected.shift()!),
      complete: () => {
        expect(expected.length).toEqual(0);
        done();
      },
      error: done.fail,
    });
  });
  • For buffering concat (see diagram above) it passes
  • For xstream concat it produces [0, 1, 2, 3, 4, 5] instead of [0, 1, 2, 0, 1, 2]

The buffering concat behaves like

--1--2---3---4-|
-a--b-c--d-|
--------X---------Y---------Z-
          concat
--1--2---3---4-abcdXY-------Z-

wheras xstream concat behaves like

--1--2---3---4-|
...............--a-b-c--d-|
          concat
--1--2---3---4---a-b-c--d-|

As far as I can see xstream does what it documents. Hard to say what users expect.

webmaster128 avatar Jan 03 '19 23:01 webmaster128