stream_transform icon indicating copy to clipboard operation
stream_transform copied to clipboard

Events dropped when using startWith operator

Open macjohnny opened this issue 2 years ago • 8 comments

Description

When using the startWith operator on a broadcast source stream, the events emitted by the source stream are ignored/dropped until Future.value(initialValue) completes.

Reproduction

E.g. the following code

final modelChange = StreamController<int>.broadcast();
modelChange.stream.startWith(1).listen(print);

// the following event is not printed, i.e. dropped!
modelChange.add(2);

// schedule the following code in a separate microtask
await null;

modelChange.add(3);

prints the sequence 1, 3, where the 2 is missing.

The reason for this is that at the time of modelChange.add(2), there is a separate microtask queued to complete Future.value(initial), while the modelChange.stream is not subscribed to yet. The await null schedules the modelChange.add(3) in a separate microtask, thus the Future.value(initial) completes and we subscribe to the modelChange.stream, thus correctly capturing the modelChange.add(3) event.

This is very counterintuitive for users of startWith, since one would expect that anything that happens after the listen invocation is captured.

Suggested change

The mental model should be: startWith results in a stream with all of the events of the source stream after or at the moment of subscribing plus the initial value passed to startWith

The startWith operator could therefore be implemented like this:

extension Concatenate<T> on Stream<T> {
  Stream<T> startWithImproved(T initial) => Stream.value(initial).merge(this);
}

In this case,

final modelChange = StreamController<int>.broadcast();
modelChange.stream.startWithImproved(1).listen(print);

modelChange.add(2);

await null;

modelChange.add(3);

correctly prints 1,2,3;

Note: This, however, relies on the exact behavior of merge() to give priority to the Stream.value() stream.

Important: As described https://github.com/dart-lang/stream_transform/issues/152#issuecomment-1157707676 in this implementation proposal is not behaving correctly for synchronous broadcast streams, so an alternative implementation as suggested in https://github.com/dart-lang/stream_transform/pull/151#discussion_r896669658 should be considered and developed.

macjohnny avatar Jun 16 '22 08:06 macjohnny