rxdart
rxdart copied to clipboard
shareValue() not behaving as expected
Sorry if this maybe is a question, but I don't think that shareValue() is behaving as it was expected to (or at least how I understood it was supposed to work, which in this case it could be something to add to the docs).
I have this code (adapted here for simplicity) on my library:
static final StreamController<FirebaseUser> _userReloadedStreamController =
StreamController<FirebaseUser>.broadcast();
static Stream<FirebaseUser> get onUserReloaded => _userReloadedStreamController.stream;
...
onAuthStateChangedOrReloaded =
Rx.merge([FirebaseAuth.instance.onAuthStateChanged, onUserReloaded]).shareValue();
Where I am returning a Stream that is supposed to merge 2 streams and every time someone listens to it, they will be able to receive the last emitted value. FirebaseAuth.instance.onAuthStateChanged has an implementation that resembles a Behavior Subject, every time there's a new subscriptions it emits the last value, but it doesn't use RxDart.
I was testing this inside an App where there's just one subscriber at first, when this subscriber is first created things work fine, but then the subscriber is destroyed and when a new subscriber comes the Stream stops behaving normally, there are only 1 or 0 subscribers at any given time. Here are some logs to illustrate better what's going on:
This is the Widget I am using for testing:
class Wrapper extends StatefulWidget {
Wrapper() {
print('created');
}
@override
_WrapperState createState() => _WrapperState();
}
class _WrapperState extends State<Wrapper> {
@override
Widget build(BuildContext context) {
return StreamBuilder<FirebaseUser>(
stream: FirebaseAuth.instance.onAuthStateChanged,
builder: (context, snapshot) {
print(snapshot.connectionState);
print('snapshot.data(user): ${snapshot.data}');
return Container();
});
}
@override
void initState() {
super.initState();
print('init');
}
@override
void dispose() {
print('disposed');
super.dispose();
}
}
It just logs the different states of the Widget.
Using the stream: FirebaseAuth.instance.onAuthStateChanged These are the logs that I get when Wrapper is created and destroyed multiple times:
I/flutter (21763): created I/flutter (21763): init I/flutter (21763): ConnectionState.waiting I/flutter (21763): snapshot.data(user): null I/flutter (21763): ConnectionState.active I/flutter (21763): snapshot.data(user): FirebaseUser(Instance of 'PlatformUser') I/flutter (21763): ConnectionState.waiting I/flutter (21763): snapshot.data(user): FirebaseUser(Instance of 'PlatformUser') I/flutter (21763): ConnectionState.active I/flutter (21763): snapshot.data(user): FirebaseUser(Instance of 'PlatformUser') I/flutter (21763): disposed I/flutter (21763): init I/flutter (21763): ConnectionState.waiting I/flutter (21763): snapshot.data(user): null I/flutter (21763): ConnectionState.active I/flutter (21763): snapshot.data(user): FirebaseUser(Instance of 'PlatformUser') I/flutter (21763): disposed I/flutter (21763): init I/flutter (21763): ConnectionState.waiting I/flutter (21763): snapshot.data(user): null I/flutter (21763): ConnectionState.active I/flutter (21763): snapshot.data(user): FirebaseUser(Instance of 'PlatformUser') I/flutter (21763): disposed I/flutter (21763): init I/flutter (21763): ConnectionState.waiting I/flutter (21763): snapshot.data(user): null I/flutter (21763): ConnectionState.active I/flutter (21763): snapshot.data(user): FirebaseUser(Instance of 'PlatformUser')
Basically, it goes from ConnectionState.waiting to ConnectionState.active as expected.
Now, these are the logs that I get when I use onAuthStateChangedOrReloaded (the one using shareValue()) instead:
I/flutter (21763): created I/flutter (21763): init I/flutter (21763): ConnectionState.waiting I/flutter (21763): snapshot.data(user): null I/flutter (21763): ConnectionState.active I/flutter (21763): snapshot.data(user): FirebaseUser(Instance of 'PlatformUser') I/flutter (21763): disposed I/flutter (21763): init I/flutter (21763): ConnectionState.waiting I/flutter (21763): snapshot.data(user): FirebaseUser(Instance of 'PlatformUser') I/flutter (21763): ConnectionState.done I/flutter (21763): snapshot.data(user): FirebaseUser(Instance of 'PlatformUser') I/flutter (21763): disposed I/flutter (21763): init I/flutter (21763): ConnectionState.waiting I/flutter (21763): snapshot.data(user): FirebaseUser(Instance of 'PlatformUser') I/flutter (21763): ConnectionState.done I/flutter (21763): snapshot.data(user): FirebaseUser(Instance of 'PlatformUser') I/flutter (21763): disposed
As you can see, the stream created by shareValue() works as expected on the first subscription, than later it goes instantaneously from ConnectionState.waiting to ConnectionState.done.
Is this the expected behavior? I couldn't find it documented anywhere. Should I be doing something differently?
Which version of rxdart are you using?
Hey there -- that's actually the expected behavior of shareValue: It will start listening to the source stream when first listened to, then shut everything down when there are no more subscribers. I've tried to document it as part of the extension method: https://pub.dev/documentation/rxdart/latest/rx/ConnectableStreamExtensions/shareValue.html
This method is uses the refCount operator under the hood, which comes from Rx: https://pub.dev/documentation/rxdart/latest/rx/ConnectableStreamExtensions/shareValue.html
However, despite the fact that this is a standard Rx operator and is useful for some cases, many folks are surprised by / do not want this behavior, and want the Stream to keep working after the final subscription is cancelled.
Therefore, I've thought about implemented the operators asBroadcastValueStream and asBroadcastValueStreamSeeded. These would convert any Stream into a broadcast ValueStream, but would not do the reference counting that shareValue is doing, nor would it replay the latest event to the listener (similar to how asBroadcastStream does not replay values).
Would that work / be helpfu?
Hmm I think I misunderstood what "shutsdown" means on the docs. I thought it would shutdown until someone subscribes again. Still what the logs show me is that the stream didn't completely shutdown, it emits a value and then closes, if it was truly shut down shouldn't it just don't emit at all?
I was using asBroadcastStream, but I changed to shareValue because I want to replay the latest event on each new listener.
Are there any ways of having a broadcast stream that replays the latest value?
Thanks for the feedback, I'll try to rewrite those docs so they're easier to understand. In this case, since we don't have an operator fits your use case perfectly, you need to manage a BehaviorSubject or ConnectableStream to achieve this.
In your case, my recommendation would be like this:
class StreamContainer {
final StreamController<FirebaseUser> _userReloadedStreamController =
StreamController<FirebaseUser>.broadcast();
Stream<FirebaseUser> get onUserReloaded => _userReloadedStreamController.stream;
Stream _onAuthStateChangedOrReloaded;
StreamSubscription _connectionSubscription;
Stream get onAuthStateChangedOrReloaded => _onAuthStateChangedOrReloaded;
void init() {
// Manually Create the "Published" Stream using `publishValue` instead of `shareValue`
_onAuthStateChangedOrReloaded = Rx.merge([FirebaseAuth.instance.onAuthStateChanged, onUserReloaded]).publishValue();
// Ask the "Published" stream to "connect" to the underlying Merged Stream and start listening for values. Also store the subscription so we can cancel it later.
_ connectionSubscription = _onAuthStateChangedOrReloaded.connect();
}
void dispose() {
// When you're done using the Stream entirely, call the dispose method on this class which will cancel the underlying subscription to Rx.merge
_connectionSubscription?.cancel();
}
}
Does that work for ya?
Managing the stream was my plan B, I am just a bit disappointed there isn't an operator for something as simple as this.
I will try to adapt your example as my class is actually made of a bunch of static fields so it's easier for the users, but harder to control initialization and disposal, thanks Brian.
@brianegan as far I have read Publish/Connect doesn't buffer the last value, so is your recommendation buffering it?
Never mind, I can see it does buffer it in the docs, I guess other Rx Implementations don't buffer it, which confused me.
Yah, there are three types of publish (Connectable Streams in general can be confusing): publish (no replay), publishValue (replay latest value), publishReplay (replay all values up to a max).
I am just a bit disappointed there isn't an operator for something as simple as this.
This is a tough call -- it's hard to have an operator for all of these different combinatorial cases without making a huge API. This might be a good time for an operator / extension you maintain in-house for now?
For my particular use case I think I will just do a publishValue()..connect();, as the stream is supposed to live for the entire duration of the App and be initialized as soon as it starts, so no need to ever cancel it.
I understand your point regarding a big API, I just thought "a buferred broadcast stream" was a common call, as the difference between a single subscription stream and a broadcast stream (besides the obvious one) is that broadcast streams don't buffer values, so this new operator could remove the only missing feature from broadcast streams that single subscription streams have.
@brianegan I am getting an error while trying to run some tests, here's my code:
static Stream<FirebaseUser> _onAuthStateChangedOrReloaded =
_mergeWithOnUserReloaded(_auth.onAuthStateChanged);
static Stream<FirebaseUser> _mergeWithOnUserReloaded(Stream<FirebaseUser> stream) {
return Rx.merge([stream, onUserReloaded]).publishValue()..connect();
}
And here are the tests that are breaking:
test('Reloads emits the new User in onAuthStateChangedOrReloaded', () async {
expect(FirebaseUserReloader.onAuthStateChangedOrReloaded,
emitsInOrder([mockOldUser, mockNewUser]));
await FirebaseUserReloader.reloadCurrentUser();
});
test('Current user is emmited when subscribing to onAuthStateChangedOrReloaded', () {
expect(FirebaseUserReloader.onAuthStateChangedOrReloaded, emits(mockOldUser));
});
And these are the errors:
dart:core Object.noSuchMethod package:async/src/stream_queue.dart 472:19 StreamQueue._pause package:async/src/stream_queue.dart 434:7 StreamQueue._updateRequests package:async/src/stream_queue.dart 514:5 StreamQueue._addResult package:async/src/stream_queue.dart 484:9 StreamQueue._ensureListening.
dart:async _EventSinkWrapper.add package:rxdart/src/transformers/start_with.dart 18:17 _StartWithStreamSink.add dart:async _StreamSinkWrapper.add package:rxdart/src/transformers/start_with.dart 59:13 _StartWithStreamSink._safeAddFirstEvent package:rxdart/src/transformers/start_with.dart 40:13 _StartWithStreamSink.onListen package:rxdart/src/utils/forwarding_stream.dart 18:22 forwardStream. dart:async _BoundSinkStream.listen package:rxdart/src/streams/defer.dart 37:18 DeferStream.listen dart:async StreamView.listen package:async/src/stream_queue.dart 483:31 StreamQueue._ensureListening package:async/src/stream_queue.dart 542:7 StreamQueue._addRequest package:async/src/stream_queue.dart 299:5 StreamQueue.startTransaction package:test_api expect package:flutter_test/src/widget_tester.dart 234:3 expect test\firebase_user_stream_test.dart 70:7 main. . ===== asynchronous gap =========================== dart:async _BoundSinkStream.listen package:rxdart/src/streams/defer.dart 37:18 DeferStream.listen dart:async StreamView.listen package:async/src/stream_queue.dart 483:31 StreamQueue._ensureListening package:async/src/stream_queue.dart 542:7 StreamQueue._addRequest package:async/src/stream_queue.dart 299:5 StreamQueue.startTransaction package:test_api expect package:flutter_test/src/widget_tester.dart 234:3 expect test\firebase_user_stream_test.dart 70:7 main. . NoSuchMethodError: The method 'pause' was called on null. Receiver: null Tried calling: pause()
I think dart stream tests use a StreamQueue and it's trying to pause this Connectable Stream, but Connectable Streams don't actually allow pausing, is this correct? How do we test them then?
Using expectAsync1:
FirebaseUserReloader.onAuthStateChangedOrReloaded.listen(
expectAsync1((user) => expect(user, mockOldUser))
);
Thanks @hoc081098, but how can I use emitsInOrder with expectAsync1?
I tried this:
test('Reloads emits the new User in onAuthStateChangedOrReloaded', () async {
FirebaseUserReloader.onAuthStateChangedOrReloaded.listen(expectAsync1(
(user) => expect(user, emitsInOrder([mockOldUser, mockNewUser]))));
await FirebaseUserReloader.reloadCurrentUser();
});
And I got this:
package:test_api expect
package:flutter_test/src/widget_tester.dart 234:3 expect
test\firebase_user_stream_test.dart 71:21 main.<fn>.<fn>.<fn>
dart:async _EventSinkWrapper.add
package:rxdart/src/transformers/start_with.dart 18:17 _StartWithStreamSink.add
dart:async _StreamSinkWrapper.add
package:rxdart/src/transformers/start_with.dart 59:13 _StartWithStreamSink._safeAddFirstEvent
package:rxdart/src/transformers/start_with.dart 40:13 _StartWithStreamSink.onListen
package:rxdart/src/utils/forwarding_stream.dart 18:22 forwardStream.<fn>
dart:async _BoundSinkStream.listen
package:rxdart/src/streams/defer.dart 37:18 DeferStream.listen
dart:async StreamView.listen
test\firebase_user_stream_test.dart 70:57 main.<fn>.<fn>
===== asynchronous gap ===========================
dart:async _BoundSinkStream.listen
package:rxdart/src/streams/defer.dart 37:18 DeferStream.listen
dart:async StreamView.listen
test\firebase_user_stream_test.dart 70:57 main.<fn>.<fn>
Callback called more times than expected (1).
dart:async _EventSinkWrapper.add
package:rxdart/src/transformers/start_with.dart 18:17 _StartWithStreamSink.add
dart:async _BroadcastStreamController.add
package:rxdart/src/subjects/subject.dart 141:17 Subject._add
package:rxdart/src/subjects/subject.dart 135:5 Subject.add
===== asynchronous gap ===========================
dart:async _BoundSinkStream.listen
package:rxdart/src/streams/defer.dart 37:18 DeferStream.listen
dart:async StreamView.listen
test\firebase_user_stream_test.dart 70:57 main.<fn>.<fn>
Expected: should do the following in order:
* emit an event that MockFirebaseUser:<MockFirebaseUser>
* emit an event that MockFirebaseUser:<MockFirebaseUser>
Actual: MockFirebaseUser:<MockFirebaseUser>
Which: was not a Stream or a StreamQueue
As far as I understand the stream will be disconnected from emitsInOrder by expectAsync1, so I have no idea on how to make them work together.
Sorry if I am abusing guys, I am pretty lost on this one, I can't find any references online on how to test this.
Trying like this
final expected = [mockOldUser, mockNewUser];
var i = 0;
stream.listen(
expectAsync1(
(user) => expect(user, expected[i++]),
count: expected.length,
)
);
Sent from my Redmi 7A using FastHub
Thanks @hoc081098 it works! It's tricky to test those things, I was looking at RxDart tests and I could only find expectLater and it doesn't work.
FYI the tests are interacting between each other because the steam subscription wasn't closed, so I am adding this modification now.
@brianegan I was looking at this behavior a bit deeper and I found something curious, if you look into my first post you can see that the StreamBuilder shows a ConnectionState.done for the shareValue() stream, but it still can reconnect to that same stream again, get the last value and complete with a ConnectionState.done. Is this the expected behavior? I thought once the number of subscribers gets to 0 no one can subscribe anymore and get any values from it.
I understand your point regarding a big API, I just thought "a buferred broadcast stream" was a common call, as the difference between a single subscription stream and a broadcast stream (besides the obvious one) is that broadcast streams don't buffer values, so this new operator could remove the only missing feature from broadcast streams that single subscription streams have.
Agreed, thats why I'd thought about adding asBroadcastValueStream / asBroadcastValueStreamSeeded to do just that, and I'm specifically thinking of integrating with Flutter for these operators.
In my head, these operators would convert a normal Stream into a Broadcast Stream that allows you to read the latest value, but it would NOT replay the latest value the way shareValue does, nor would it shut down after the number of listeners goes to 0. In fact, I think that's actually best when working with StreamBuilder widgets.
An example might look like this:
final onAuthStateChangedOrReloaded =
Rx.merge([FirebaseAuth.instance.onAuthStateChanged, onUserReloaded]).asBroadcastValueStream();
// Later in your Widgets
StreamBuilder(
initialData: onAuthStateChangedOrReloaded.value,
stream: onAuthStateChangedOrReloaded,
builder: (context, snapshot) => Text('${snapshot.data}'),
);
Basically, the asBroadcastValueStream would not replay the value, but you could still provide the latest captured value to the StreamBuilder widget's initialData property. This is actually better IMO than replaying the value, because it requires Flutter only build the Widget once. With replays, the StreamBuilder actually has to build twice (once with no data, then once the first data event is delivered by the Stream).
What do you think about that approach?
I was looking at this behavior a bit deeper and I found something curious, if you look into my first post you can see that the StreamBuilder shows a ConnectionState.done for the shareValue() stream, but it still can reconnect to that same stream again, get the last value and complete with a ConnectionState.done. Is this the expected behavior? I thought once the number of subscribers gets to 0 no one can subscribe anymore and get any values from it.
Sounds fishy indeed. I'll write some tests to see what's going on there -- thanks for the info!
I discovered a bug on StreamBuilder while investigating this, so I made a sample project to report it. You can use it for testing this if you want, but you will have to change my library's reference on pubspec.yaml to firebase_user_stream: 1.0.0-alpha.2 as this is the version that shows this behavior. Check the readme for some quick instructions on how to trigger it.
What do you think about that approach?
I like it a lot for StreamBuilder, makes sense.
@brianegan I think a replay observable that was long lived and didn't use refCount would be a valuable addition.
In regards to the stream builder building twice even if passed a type of stream where the value is stored so it can be access synchronously I create the package https://pub.dev/packages/flutter_rx_stream_builder a while ago.
Oh, I'm working on my branch: https://github.com/hoc081098/rxdart/tree/hoc081098/value_subject to introduce ValueSubject :)
@brianegan I filed a bug here on StreamBuilder, but they complained my sample uses external libraries. I am having a hard time reproducing this using the standard Stream library, if you could give me some insights on this I would appreciate it.
@feinstein Your initial problem description describes an issue with re-subscribing that sounds similar to what we've encountered. We use an Rx.defer pattern as I describe in this post https://github.com/ReactiveX/rxdart/issues/455#issuecomment-622561853.
@rich-j thanks, I will take a look, but .publishValue()..connect(); is working for my use case.
in a project I participate, I ended up running into this same problem, but in my use case, prevented from canceling the StreamSubscription returned by ConnectableStream.connect();
I will create an extension and add it to rxdart with a PR.
follow the example:
extension ValueExtensions<T> on Stream<T> {
Stream<T> asValueStream() {
StreamSubscription<T>? subscription;
final stream = doOnDone((){
// When source stream done, cancel [ConnectableStream] subscription
subscription?.cancel();
})
.publishValue();
// Intructs the [ConnectableStream to begin emiting items]
subscription = stream.connect();
return stream;
}
Stream<T> asValueSeededStream(T seedValue) {
StreamSubscription<T>? subscription;
final stream = doOnDone((){
// When source stream done, cancel [ConnectableStream] subscription
subscription?.cancel();
})
.publishValueSeeded(seedValue);
// Intructs the [ConnectableStream to begin emiting items]
subscription = stream.connect();
return stream;
}
Stream<T> asReplayStream({int? maxSize}) {
StreamSubscription<T>? subscription;
final stream = doOnDone((){
// When source stream done, cancel [ConnectableStream] subscription
subscription?.cancel();
})
.publishReplay(maxSize: maxSize);
// Intructs the [ConnectableStream to begin emiting items]
subscription = stream.connect();
return stream;
}
Stream<T> asBroadcast() {
StreamSubscription<T>? subscription;
final stream = doOnDone((){
// When source stream done, cancel [ConnectableStream] subscription
subscription?.cancel();
})
.publish();
// Intructs the [ConnectableStream to begin emiting items]
subscription = stream.connect();
return stream;
}
}
asBroadcastValueStream would be very much appreciated
It can be done, I would like to open a PR, but I would like the opinion of the repository maintainers as well.
Guys, I came there from many years of experience in rxjs and java's reactor project... and this lib is so strange to me... BehaviorSubject which is canceled after last subscription - what is a purpose of a seed then? I usually use the BehaviorSubject as a value container, to notiffy all listeners - now and whenever - that value is changed, but also the current value. Now the question is on the listener side, whether this subject is still valid or canceled. What if canceled because there are no listeners any more, what then I supposed to do? Lie down and cry?