stream_transform
stream_transform copied to clipboard
Events dropped when using startWith operator
Description
When using the startWith operator on a broadcast source stream, the events emitted by the source stream are ignored/dropped until Future.value(initialValue)
completes.
Reproduction
E.g. the following code
final modelChange = StreamController<int>.broadcast();
modelChange.stream.startWith(1).listen(print);
// the following event is not printed, i.e. dropped!
modelChange.add(2);
// schedule the following code in a separate microtask
await null;
modelChange.add(3);
prints the sequence 1, 3
, where the 2
is missing.
The reason for this is that at the time of modelChange.add(2)
, there is a separate microtask queued to complete Future.value(initial)
, while the modelChange.stream
is not subscribed to yet.
The await null
schedules the modelChange.add(3)
in a separate microtask, thus the Future.value(initial)
completes and we subscribe to the modelChange.stream
, thus correctly capturing the modelChange.add(3)
event.
This is very counterintuitive for users of startWith
, since one would expect that anything that happens after the listen
invocation is captured.
Suggested change
The mental model should be: startWith
results in a stream with all of the events of the source stream after or at the moment of subscribing plus the initial value passed to startWith
The startWith
operator could therefore be implemented like this:
extension Concatenate<T> on Stream<T> {
Stream<T> startWithImproved(T initial) => Stream.value(initial).merge(this);
}
In this case,
final modelChange = StreamController<int>.broadcast();
modelChange.stream.startWithImproved(1).listen(print);
modelChange.add(2);
await null;
modelChange.add(3);
correctly prints 1,2,3
;
Note: This, however, relies on the exact behavior of merge()
to give priority to the Stream.value()
stream.
Important: As described https://github.com/dart-lang/stream_transform/issues/152#issuecomment-1157707676 in this implementation proposal is not behaving correctly for synchronous broadcast streams, so an alternative implementation as suggested in https://github.com/dart-lang/stream_transform/pull/151#discussion_r896669658 should be considered and developed.