ReactiveNetwork
ReactiveNetwork copied to clipboard
Wrong behavior with multiple subscriptions
Right now the following test for MarshmallowNetworkObservingStrategy
will fail:
@Test public void shouldAllowMultipleConnectivityObservers() {
// given
TestObserver<Connectivity> observer = strategy.observeNetworkConnectivity(context).test();
TestObserver<Connectivity> observer2 = strategy.observeNetworkConnectivity(context).test();
ConnectivityManager.NetworkCallback networkCallback = strategy.createNetworkCallback(context);
// when
networkCallback.onAvailable(network);
observer.dispose();
networkCallback.onAvailable(network);
// then
observer.assertValueCount(1);
observer2.assertValueCount(2);
}
This is because the library modifies global state (subscription to broadcast receiver) when creating and disposing single stream. This makes it impossible to have multiple network connection subscriptions in the app, even if it makes sense (e.g. one subscription in the service, one in activity), because as soon as any subscription is disposed, all the others stop working.
Seems like the solution would be either to create new receiver every time observeNetworkConnectivity
is called (probably less efficient), or hold the subscription in a shared publish subject, so that the first subscriber registers to the broadcast receiver, and the last disposer unregisters from it. I tried to quickly prototype this, but I unfortunately the tests are pretty coupled with internal implementation, so I wasn't able to quickly verify it would work
Hi,
Thanks for reporting that issue. You're right and it may work this way. In case of multiple Activities it shouldn't be an issue because we always have one or zero "active" Activity. For multiple streams within a single Activity behaviour should be correct. This may be an issue in the case of Activity and Service as you mentioned. In such case, I'd create a single subscription for whole application (in the class deriving from Application
) and then send events to several components (Activities and Services). It can be done with BroadcastReceiver
or Event Bus like Otto. Some time ago, I've created ReactiveBus, which is very simple and can be used to keep reactive design of the events. I'm not sure if that's the best way to solve this problem. I'd avoid creating new BroadcastReceiver
everytime because it may increase memory consumption of the app.
it shouldn't be an issue because we always have one or zero "active" Activity.
Only if you call ReactiveNetwork.observe...
again for each activity. Sharing single observable created using that method will always potentially have this problem. I read the code again and I was wrong as to the problem -- it's not that the global state is mutated (it's not). It's that single Strategy
instance effectively only allows single subscription.
I'd avoid creating new BroadcastReceiver everytime because it may increase memory consumption of the app.
This already happens for each "subscription" (ReactiveNetwork.observeNetworkConnectivity
call), as when new strategy is created, it creates a new receiver. So creating new one for each observe...
call within strategy wouldn't make a difference.
I still feel the issue is a bug -- when returning a subscription, consumer shouldn't have to care about internal implementation, and should be allowed to subscribe and unsubscribe, multiple times, in any order they fancy. For this to work, strategies must maintain symmetry between emitter's onSubscribe
(when manager.registerNetworkCallback(request, networkCallback)
should be called) and emitter's onCancel
(where unregistering should take place).
Yeah, for a single instance of an object it may be a problem. Quick fix may be usage of multiple instances of the ReactiveNetwork
class for several Activities and Services. I'll take a look on that, but I cannot promise that I'll resolve that quickly due to limited amount of time. I'm open for any suggestions for improvements and Pull Requests.
While this issue is very old, I came across it today.
A workaround is to wrap the ReactiveNetwork
call in Observable.defer {}
. The resulting Observable
can then be treated like a typical observable instance that can be unsubscribed and resubscribed at will. This allows a downstream that uses refCount
to maintain only one ReactiveNetwork
observable.
Example:
Observable.defer {
ReactiveNetwork.observeNetworkConnectivity(context)
.subscribeOn(Schedulers.io())
.replay(1).refCount()
}
Thanks for sharing your feedback @brentnd! It may be helpful for the others.