CombineLatest does not seem to allow multiple subscriptions
I am not sure if this is explicitly a bug, but combine latest will throw an error if you attempt to subscribe to it multiple times.
For now I have solved my issue by moving my combine latest out of the builder and created it with an .asBoradcastStream().
I think all of our Stream constructors are single subscription by default, the same way plain Dart Stream constructors work.
Are the Streams that you combine all broadcast Streams and did you expect combineLatest to follow that logic?
That makes sense. I think it was a misunderstanding on my part because all Rx streams allow multiple listeners by default so I kind of assumed the constructors would do the same. Yes the streams I am combining are all broadcast.
We made the choice to follow Dart Streams first, so that rxdart would integrate seamlessly.
Not a big fan of single subscription myself, maybe we could change contructors like combineLatest, zip and merge to try be broadcast, if all provided Streams are too.
What do you guys think @brianegan @hoc081098 ?
I am okay with either personally. I would just like to make sure it is documented as it is not super clear what the expected behavior should be. ❤️
@frankpepermans We found an unintended consequence of using .asBroadcastStream() - see StackOverflow Dart stream .asBroadcastStream memory leak. Here is our answer:
From the
.asBroadcastStream()documentation:The returned stream will subscribe to this stream when its first subscriber is added, and will stay subscribed until this stream ends, or a callback cancels the subscription.
So by design the stream exists until explicitly cancelled. To cancel the subscription when the last listener cancels use:
Rx.combineLatest(...).asBroadcastStream( onCancel: (sub) => sub.cancel() )There is further discussion in Stream.asBroadcastStream - Easy to cause leaks, what is the >rationale? #26686
We use a lot of BehaviorSubjects that don't end, so we need to cancel the subscription. Changing RX constructors to be broadcast - with correct cancel processing - would have saved us from the unintended memory leaks caused by adding .asBroadcastStream() to fix the obvious multiple subscription issue.
Additional thoughts in support of changing rxDart constructors (e.g. combineLatest) to be broadcast when given broadcast source streams.
The following "API" snippet suggests that we have 2 streams of lists that we can use in our U/I (please ignore compile issues and other incompleteness):
class Common {
final Stream<List<String>> data = BehaviorSubject();
final Stream<String> filter = BehaviorSubject();
final Stream<List<String>> filteredData = Rx.combineLatest2(data, filter, ...);
}
In our U/I (e.g. Flutter) we can use the data stream with impunity because it's broadcast. By looking at the implementation an experienced developer will know that filteredData is not going to behave the same because it's single subscription . An API should be independent of the implementation, so how do we make filteredData have broadcast semantics?
Appending .asBroadcastStream() doesn't terminate the inner subscription which caused memory leaks as mentioned above and also messed up our reference counting. (We use reference counting to know which streams are active and need to be synchronized with the server.) Adding the above mentioned onCancel handler fixed the leak but now re-subscribing is broken because of the single subscription combineLatest constructor.
Changing to a "getter" such as get filteredData => Rx... doesn't work because Flutter's StreamBuilder expects a stable object. You can compensate with memoization but that "breaks" the API.
The closest API preserving pattern that we've found is
Rx.defer(
() => Rx.combineLatestX( ... ),
reusable: true,
)
This provides a stable object that can be subscribed to and re-subscribed to multiple times and also doesn't leak memory.
I have the same issue with re-subscribing to the stream produced from combineLatest.
Successfully used workaround with Rx.defer to solve my issue.
Thanks, @rich-j