rxdart icon indicating copy to clipboard operation
rxdart copied to clipboard

CombineLatest does not seem to allow multiple subscriptions

Open rmargolinross opened this issue 5 years ago • 7 comments

I am not sure if this is explicitly a bug, but combine latest will throw an error if you attempt to subscribe to it multiple times.

For now I have solved my issue by moving my combine latest out of the builder and created it with an .asBoradcastStream().

rmargolinross avatar Apr 30 '20 18:04 rmargolinross

I think all of our Stream constructors are single subscription by default, the same way plain Dart Stream constructors work.

Are the Streams that you combine all broadcast Streams and did you expect combineLatest to follow that logic?

frankpepermans avatar Apr 30 '20 18:04 frankpepermans

That makes sense. I think it was a misunderstanding on my part because all Rx streams allow multiple listeners by default so I kind of assumed the constructors would do the same. Yes the streams I am combining are all broadcast.

rmargolinross avatar Apr 30 '20 18:04 rmargolinross

We made the choice to follow Dart Streams first, so that rxdart would integrate seamlessly.

Not a big fan of single subscription myself, maybe we could change contructors like combineLatest, zip and merge to try be broadcast, if all provided Streams are too.

What do you guys think @brianegan @hoc081098 ?

frankpepermans avatar Apr 30 '20 18:04 frankpepermans

I am okay with either personally. I would just like to make sure it is documented as it is not super clear what the expected behavior should be. ❤️

rmargolinross avatar Apr 30 '20 18:04 rmargolinross

@frankpepermans We found an unintended consequence of using .asBroadcastStream() - see StackOverflow Dart stream .asBroadcastStream memory leak. Here is our answer:

From the .asBroadcastStream() documentation:

The returned stream will subscribe to this stream when its first subscriber is added, and will stay subscribed until this stream ends, or a callback cancels the subscription.

So by design the stream exists until explicitly cancelled. To cancel the subscription when the last listener cancels use:

Rx.combineLatest(...).asBroadcastStream( onCancel: (sub) => sub.cancel() )

There is further discussion in Stream.asBroadcastStream - Easy to cause leaks, what is the >rationale? #26686

We use a lot of BehaviorSubjects that don't end, so we need to cancel the subscription. Changing RX constructors to be broadcast - with correct cancel processing - would have saved us from the unintended memory leaks caused by adding .asBroadcastStream() to fix the obvious multiple subscription issue.

rich-j avatar May 01 '20 19:05 rich-j

Additional thoughts in support of changing rxDart constructors (e.g. combineLatest) to be broadcast when given broadcast source streams.

The following "API" snippet suggests that we have 2 streams of lists that we can use in our U/I (please ignore compile issues and other incompleteness):

class Common {
  final Stream<List<String>> data = BehaviorSubject();
  final Stream<String> filter = BehaviorSubject();
  final Stream<List<String>> filteredData = Rx.combineLatest2(data, filter, ...);
}

In our U/I (e.g. Flutter) we can use the data stream with impunity because it's broadcast. By looking at the implementation an experienced developer will know that filteredData is not going to behave the same because it's single subscription . An API should be independent of the implementation, so how do we make filteredData have broadcast semantics?

Appending .asBroadcastStream() doesn't terminate the inner subscription which caused memory leaks as mentioned above and also messed up our reference counting. (We use reference counting to know which streams are active and need to be synchronized with the server.) Adding the above mentioned onCancel handler fixed the leak but now re-subscribing is broken because of the single subscription combineLatest constructor.

Changing to a "getter" such as get filteredData => Rx... doesn't work because Flutter's StreamBuilder expects a stable object. You can compensate with memoization but that "breaks" the API.

The closest API preserving pattern that we've found is

Rx.defer(
  () => Rx.combineLatestX( ... ),
  reusable: true,
)

This provides a stable object that can be subscribed to and re-subscribed to multiple times and also doesn't leak memory.

rich-j avatar May 01 '20 20:05 rich-j

I have the same issue with re-subscribing to the stream produced from combineLatest. Successfully used workaround with Rx.defer to solve my issue. Thanks, @rich-j

Windstalker avatar May 12 '20 10:05 Windstalker