Events dropped when using startWith operator
Description
When using the startWith operator on a broadcast source stream, the events emitted by the source stream are ignored/dropped until Future.value(initialValue) completes.
Reproduction
E.g. the following code
final modelChange = StreamController<int>.broadcast();
modelChange.stream.startWith(1).listen(print);
// the following event is not printed, i.e. dropped!
modelChange.add(2);
// schedule the following code in a separate microtask
await null;
modelChange.add(3);
prints the sequence 1, 3, where the 2 is missing.
The reason for this is that at the time of modelChange.add(2), there is a separate microtask queued to complete Future.value(initial), while the modelChange.stream is not subscribed to yet.
The await null schedules the modelChange.add(3) in a separate microtask, thus the Future.value(initial) completes and we subscribe to the modelChange.stream, thus correctly capturing the modelChange.add(3) event.
This is very counterintuitive for users of startWith, since one would expect that anything that happens after the listen invocation is captured.
Suggested change
The mental model should be: startWith results in a stream with all of the events of the source stream after or at the moment of subscribing plus the initial value passed to startWith
The startWith operator could therefore be implemented like this:
extension Concatenate<T> on Stream<T> {
Stream<T> startWithImproved(T initial) => Stream.value(initial).merge(this);
}
In this case,
final modelChange = StreamController<int>.broadcast();
modelChange.stream.startWithImproved(1).listen(print);
modelChange.add(2);
await null;
modelChange.add(3);
correctly prints 1,2,3;
Note: This, however, relies on the exact behavior of merge() to give priority to the Stream.value() stream.
Important: As described https://github.com/dart-lang/stream_transform/issues/152#issuecomment-1157707676 in this implementation proposal is not behaving correctly for synchronous broadcast streams, so an alternative implementation as suggested in https://github.com/dart-lang/stream_transform/pull/151#discussion_r896669658 should be considered and developed.
see also https://github.com/dart-lang/stream_transform/pull/151
It relies on merge giving priority to the first stream only in that it calls listen on that first. It's still a matter of who emits a value first, and if this is a synchronous broadcast stream, it should be possible for it to emit a value before the microtask-delay of emitting initial from the first stream. In theory.
I tried breaking it once and failed. Hmm, try harder! Yes, got it!
import "dart:async";
import "package:stream_transform/stream_transform.dart";
extension Concatenate<T> on Stream<T> {
Stream<T> startWithImproved(T initial) => Stream.value(initial).merge(this);
}
void main() {
var bsc = StreamController<int>.broadcast(sync: true);
bsc.stream.forEach((v) {
print("1: $v");
});
bsc.add(1);
// Schedule event to be sent "soon".
scheduleMicrotask(() {
bsc.add(2);
});
/// Schedule "0" to be sent "slightly later".
bsc.stream.startWithImproved(0).forEach((v) {
print("2: $v");
});
scheduleMicrotask(bsc.close);
}
It prints 2: 2 before 2:0.
The solution I can see is to use a stream controller so you are in control of which events get added when.
E.g.: https://dartpad.dev/?id=619e9780c70bbc7b2aca6e9cd4d5e581
(Uses Stream.multi because it allows both sync and async add, and multiple listens on the same stream, if the underlying stream supports it. Would use addStream if I had fixed it to be synchronous.)
@lrhn thanks for the counterexample :-) I updated the issue description and mentioned that an alternative solution that also works for synchronous broadcast streams has to be found.
I think it should be feasible to implement these semantics.
Should the same be done for startWithMany? Or startWithStream?
I think at least startWith and startWithMany should have the same semantics.
Should this be an optional argument, or always the behavior? @lrhn - which semantics do you think are best for each of these?
First thing to agree on is the precise semantics.
We can do: startWith where:
- The first element is fixed or computed at time of listen. Either works. Same class can do both.
- The remaining stream is listened to immediately.
- The first element is delivered asynchronously.
- The remaining elements are delivered synchronously when they arrive (buffered if paused, buffered if there is a previous yet undelivered event, which means also until the first event is delivered).
- We can even trigger emit the first event synchronously with the first stream event, if that happens before the first event's microtask, so other events are not delayed (not more than a single call to an asynchronous
add).
I think we can do that one efficiently.
The tricky bit is whether the remaining-events stream can deliver events while it's delivering events. Say we try to send react to the first stream event by:
- Synchronously sending the "start with" event.
- Then synchronously sending the first stream event.
- But then(!) we get a second stream event while sending the "start with" event. We now have to make damn sure we don't add it to the output stream before the first stream event.
So, there is some state management. I think I can handle that (with a queue), just have to be sharp about the expectations.
(I don't think a stream is allowed to send events while it's sending events, that's until its
addSynchas returned, but I wouldn't be absolutely certain it cannot happen for any implementation ofStream.)
We can also have a class which exposes a synchronous getter for its state, and has a "State change" stream. I would not combine the two. The state change events are secondary artifacts related to the primary state getter. I'd even consider not including the value in the state change notification, and allow multiple state changes to be collapsed and only trigger one change notification. That's a very different beast, and not really primarily a stream. Not a good fit for this package.
For startWithMany (an iterable of events to send before the other stream), it should definitely mimic the single "startWith" above.
That means sending all the iterable events immediately on listen. (It can be used to implement startWith by having a single element iterable, or even a single element iterable which returns a value computed at the time of iteration.)
Sending multiple events "immediately" is a little tricky. Or rather, it's probably the same as the single first event case, just with more chances to get reentrant events.
It's a little questionable, though. The reason for having streams with multiple events, instead of just using a single future, is that sometimes the information is not all available at the same time, and you don't want to wait until the end for all of it. So, why send multiple stream events at the same time? Obviously all that information is available. Instead, you should just send a single event containing all the values.
That won't work for a Stream<Foo> if there is no Foo containing multiple other Foos, but I'm still questioning which information you'd be sending using multiple events at the same time, how the receiver would see it. Seems like a bad design leaking through.
For startWithStream ... that's just concat. The alternative is to listen to both streams, emit all the events of the first stream and buffer the events of the other stream (just pause the subscription), then send the buffered events when the first stream ends. I cannot find any reasonable use-case for that, and it's very bad use of memory. Just don't listen to the other stream until you need the events. If the second stream is a broadcast stream, you will lose events, but if you buffer, you can get arbitrarily old broadcast stream events, which should not be necessary. Broadcast stream events are expendable.
If anything, you could listen to a second broadcast stream and remember only the last event emitted, then emit that between the first and the second stream. Still, the use-case eludes me.
A completely defensive version if startWithMultiple could be:
import "dart:collection";
import "dart:async";
extension StartWith<T> on Stream<T> {
Stream<T> startWithMultiple(Iterable<T> initial) =>
Stream<T>.multi((controller) {
Queue<Event<T>>? bufferOpt =
Queue<Event<T>>.from(initial.map(DataEvent.new));
bool isProcessingQueue = false;
var subscription = this.listen(null); // ignore: unnecessary_this
scheduleMicrotask(() {
var buffer = bufferOpt;
if (buffer != null) {
// Cleared if first event happened before microtask.
isProcessingQueue =
true; // Stream events during this will just be added to buffer.
while (buffer.isNotEmpty) {
buffer.removeFirst().addSync(controller);
}
bufferOpt = null;
}
});
controller
..onPause = subscription.pause
..onResume = subscription.resume
..onCancel = subscription.cancel;
void addEvent(Event<T> event) {
var buffer = bufferOpt;
if (buffer != null) {
buffer.add(event);
if (isProcessingQueue)
return; // Happens during scheduled microtask or other event.
// Happens before scheduled microtask.
isProcessingQueue = true;
while (buffer.isNotEmpty) {
buffer.removeFirst().addSync(controller);
}
bufferOpt = null;
}
event.addSync(controller);
}
subscription
..onError((e, s) => addEvent(ErrorEvent(e, s)))
..onDone(() => addEvent(const DoneEvent()))
..onData((data) => addEvent(DataEvent<T>(data)));
});
}
abstract class Event<T> {
void addSync(MultiStreamController<T> controller);
}
class DataEvent<T> implements Event<T> {
final T value;
DataEvent(this.value);
@override
void addSync(MultiStreamController<T> controller) {
controller.addSync(value);
}
}
class ErrorEvent implements Event<Never> {
final Object error;
final StackTrace stackTrace;
ErrorEvent(this.error, this.stackTrace);
@override
void addSync(MultiStreamController<Object?> controller) {
controller.addErrorSync(error, stackTrace);
}
}
class DoneEvent implements Event<Never> {
const DoneEvent();
@override
void addSync(MultiStreamController<Object?> controller) {
controller.closeSync();
}
}
(Untested, obviously).
That might be overly defensive, if we can assume that no stream will send a new event while sending the previous event, then we might be able to scale down a little. Still can't assume no event happens during the microtask callback.
And we can make the subscription forward directly to controller.addSync etc. after emptying the queue.
The approach I have been testing is
/// Emits [initial] before any values or errors from the this stream.
///
/// If this stream is a broadcast stream the result will be as well.
Stream<T> startWith(T initial) {
var controller = isBroadcast
? StreamController<T>.broadcast(sync: true)
: StreamController<T>(sync: true);
var hasListened = false;
controller.onListen = () async {
if (hasListened) return;
hasListened = true;
// Edit: I supposed this breaks the promise though...
await Future.microtask(() {});
controller.add(initial);
await controller.addStream(this);
await controller.close();
};
return controller.stream;
}
/// Emits all values in [initial] before any values or errors from this
/// stream.
///
/// If this stream is a broadcast stream the result will be as well.
Stream<T> startWithMany(Iterable<T> initial) {
var controller = isBroadcast
? StreamController<T>.broadcast(sync: true)
: StreamController<T>(sync: true);
var hasListened = false;
controller.onListen = () async {
if (hasListened) return;
hasListened = true;
await Future.microtask(() {});
// Do we need microtask between each of the below?
initial.forEach(controller.add);
await controller.addStream(this);
await controller.close();
};
return controller.stream;
}
Those two, startWith and startWithMany, seem to be equivalent to just concat.
It doesn't listen to the stream until after delivering the initial event(s), which happens after a microtask delay.
That's effectively the same as (() async* { yield initial; }()).concat(this).
If we want to listen to the stream right now, and emit initial events before the first stream event, we must schedule a microtask to emit the initial events, and if the first stream even happens before that, we must either delay the first stream event(s) until after the microtask, or emit the initial events as part of the first stream event (and then the microtask just does nothing, since we can't cancel it).
(I really need to make addStream not introduce an extra asynchronous delay. It currently does, delivering each event asynchronously if the controller is asynchronous.)
Looking at this again, the proposed semantics should be implementable as:
extension<T> on Stream<T> {
Stream<T> startWith(T firstEvent) => Stream.multi((controller) {
controller
..add(firstEvent)
..addStream(this).then((_) {
controller.closeSync();
});
});
Stream<T> startWithMany(Iterable<T> firstEvents) =>
Stream.multi((controller) {
for (var event in firstEvents) controller.add(event);
controller.addStream(this).then((_) {
controller.closeSync();
});
});
}
If the original stream is a single-subscription stream, this code will throw at the addStream the second time the returned stream is listened to. Otherwise it will continue and work for any stream. It listens to the stream immediately, and queues the events in the subscription, as normal when there is an async event ahead of them in the subscription's event queue. And it should forward pauses, resumes and cancels normally.
And tried that. Didn't actually work, because of thow broadcast streams are handled - if the source is a broadcast stream, the target is also a broadcast stream, that only emits the firstEvent once.
It's not a multicast stream that emits the firstEvent once to every listener.
(That could actually be useful, but it's different, and the tests rejected it.)
It should probably be something like:
extension<T> on Stream<T> {
Stream<T> startWith(T firstEvent) {
if (isBroadcast) {
final controller = StreamController<T>.broadcast(sync: true);
controller.onListen = () {
var queue = Queue<Result<T>?>()..add(Result.value(firstEvent));
void emit(Result<T>? event) {
switch (event) {
case null:
controller.close();
case _:
event.addTo(controller);
}
}
void handle([Result<T>? event]) {
if (queue.isEmpty) {
emit(event);
} else {
queue.add(event);
}
}
Result.captureStream<T>(this).forEach(handle).whenComplete(handle);
scheduleMicrotask(() {
while (queue.isNotEmpty) {
emit(queue.removeFirst());
}
});
};
return controller.stream;
}
return Stream.multi((controller) {
controller
..add(firstEvent)
..addStream(this).whenComplete(controller.closeSync);
});
}
}
I'm still a little wary about the entire semantics, in particular inheriting being a broadcast stream from the source. We do that in other places, because it makes sense (and yet we still managed to fumble consistency when it comes to how many times a map callback is called), but it doesn't always make sense to be a broadcast stream just because the source is. Sometimes it's better to be a multicast stream.
A broadcastStream.startsWith(firstValue) is a broadcast stream which starts with firstValue and then follows with the events of broadcastStream starting at first time someone listens to the resulting stream.
Why is the first listen special? Broadcast streams generally do not treat the first listen as a special time where something happens, other than it not needing to emit events before. It can emit events when nobody is listening, they go into the void, it's an optimization to not even create the events.
That's also why you can't pre-seed a broadcast controller with events that get released when someone listens - it doesn't care whether someone listens. It might just optimize some computations away when it knows that nobody needs the result.
The "broadcasty" thing to do here, would be to schedule a microtask to emit the firstValue, whether there are listeners or not at that time, and only start listening on broadcastStream when someone actually listens.
Instead of that, we wait until the first listener, then emit the firstValue and start emitting broadcastStream events. Whether there are listeners or not. (It's an optimization to cancel the subscription to the source broadcastStream while there are no listeners, it doesn't change actual behavior).
If someone does .listen(null).cancel() and then comes back to listen a little later, the firstValue is long gone.
That leaves us open to questions about which events the stream should have.
The consistent (and implementable) answer is: The firstEvent, then every event emitted by broadcastStream after the first listen to to the result stream. The time of those events is that firstEvent is emitted in a later microtask, and the remaining events are emitted "at some time after" they are emitted by the broadcastStream. Whether a listener on the result stream gets the broadcast-stream events depends on when they are emitted by the result stream, not when they were originally provided by broadcastStream.
Which means that you can get events from broadcastStream which were mitted before you listened to the result stream:
- Create the
broadcastStream.startsWith(firstValue). - Someone listens to that stream, the immediately cancels again.
- The
broadcastStreamemits an event, which is enqueued. - Someone listens to the stream again.
- The microtask delay triggers and emits the
firstValue. - The enqueued
broadcastStreamevent is emitted. - The listener has now received an event from the
startsWithstream which was emitted by thebroadcastStreambefore they started listening.
There is no way around that. If we delay events at all, it's possible for a listen to happen between the original broadcastStream event and it being emitted by the startsWith stream. Even if it's a synchronous delay (running in a synchronous loop emitting events from a queue), the listen can happen in response to a prior event, while another event is already in the queue.
The logic inside a _SyncBroadcastStreamSubscription to ensure that only listeners added before the event will see that event, is quite complicated. And non-reentrant — a sync broadcast controller will throw if you try to call add while it's emitting events to multiple listeners. Which is a risk here, if we use a sync broadcast controller.
Completely generally: A synchronous broadcast controller is dangerous. Introducing it should be done very carefully. Or guarded by a queue (which makes sense, because it doesn't have an event queue internally), but then we again risk notifying listeners added after the event was originally emitted by the source.
If the result stream was instead treated as a multicast stream, where different listeners won't necessarily get exactly the same events at exactly the same time, then we could have every listener get the firstValue, before transitioning into the following events of the broadcastStream, starting at the time where they listened. If that is OK for the first listener, it should probably be OK for every listener. (I wouldn't make it isBroadcast then, because then cancelling and re-listening is not the same as discarding events, if the second listen gets the firstEvent again.)
An alternative is to drop the "sync" part, and just use the event queue of the non-sync broadcast subscription:
Stream<T> startWith(T firstEvent) {
if (isBroadcast) {
final controller = StreamController<T>.broadcast(sync: false);
final add = controller.add;
final addError = controller.addError;
final close = controller.close;
const int stateInitial = 0, stateScheduled = 1, stateSentFirst = 2;
int state = 0; // 0 on first listen, 1 on later listens, 2+ after having sent firstEvent.
controller.onListen = () {
var subscription = listen(null);
if (state == stateInitial) {
state = stateScheduled;
scheduleMicrotask(() {
if (state < stateSentFirst) { // If firstEvent not sent
state |= stateSentFirst;
controller.add(firstEvent); // send firstEvent
subscription.onData(add);
});
}
subscription..onData(hasSentEvent ? add : (event) {
state |= stateSentFirst;
controller..add(firstEvent)..add(event);
subscription.onData(add);
})..onError(addError)..onDone(close);
controller.onCancel = subscription.cancel;
};
return controller.stream;
}
return Stream.multi((controller) {
controller
..add(firstEvent)
..addStream(this).whenComplete(controller.closeSync);
});
}
}
That will send the firstEvent either as a microtask, or just before the first event of the source broadcast stream, and then it will add further events immediately when they are received, but deliver them only asynchronously.
However, the broadcast stream subscription remembers when the events are added, and only delivers them to listeners which were subscribed before the event was added, not just before it's emitted.