rxdart icon indicating copy to clipboard operation
rxdart copied to clipboard

shareValue() not behaving as expected

Open feinstein opened this issue 4 years ago • 60 comments

Sorry if this maybe is a question, but I don't think that shareValue() is behaving as it was expected to (or at least how I understood it was supposed to work, which in this case it could be something to add to the docs).

I have this code (adapted here for simplicity) on my library:

static final StreamController<FirebaseUser> _userReloadedStreamController = 
    StreamController<FirebaseUser>.broadcast();
static Stream<FirebaseUser> get onUserReloaded => _userReloadedStreamController.stream;
...
onAuthStateChangedOrReloaded = 
    Rx.merge([FirebaseAuth.instance.onAuthStateChanged, onUserReloaded]).shareValue();

Where I am returning a Stream that is supposed to merge 2 streams and every time someone listens to it, they will be able to receive the last emitted value. FirebaseAuth.instance.onAuthStateChanged has an implementation that resembles a Behavior Subject, every time there's a new subscriptions it emits the last value, but it doesn't use RxDart.

I was testing this inside an App where there's just one subscriber at first, when this subscriber is first created things work fine, but then the subscriber is destroyed and when a new subscriber comes the Stream stops behaving normally, there are only 1 or 0 subscribers at any given time. Here are some logs to illustrate better what's going on:

This is the Widget I am using for testing:

class Wrapper extends StatefulWidget {
  Wrapper() {
    print('created');
  }

  @override
  _WrapperState createState() => _WrapperState();
}

class _WrapperState extends State<Wrapper> {
  @override
  Widget build(BuildContext context) {
    return StreamBuilder<FirebaseUser>(
        stream: FirebaseAuth.instance.onAuthStateChanged,
        builder: (context, snapshot) {
          print(snapshot.connectionState);
          print('snapshot.data(user): ${snapshot.data}');
          return Container();
        });
  }

  @override
  void initState() {
    super.initState();
    print('init');
  }

  @override
  void dispose() {
    print('disposed');
    super.dispose();
  }
}

It just logs the different states of the Widget.

Using the stream: FirebaseAuth.instance.onAuthStateChanged These are the logs that I get when Wrapper is created and destroyed multiple times:

I/flutter (21763): created I/flutter (21763): init I/flutter (21763): ConnectionState.waiting I/flutter (21763): snapshot.data(user): null I/flutter (21763): ConnectionState.active I/flutter (21763): snapshot.data(user): FirebaseUser(Instance of 'PlatformUser') I/flutter (21763): ConnectionState.waiting I/flutter (21763): snapshot.data(user): FirebaseUser(Instance of 'PlatformUser') I/flutter (21763): ConnectionState.active I/flutter (21763): snapshot.data(user): FirebaseUser(Instance of 'PlatformUser') I/flutter (21763): disposed I/flutter (21763): init I/flutter (21763): ConnectionState.waiting I/flutter (21763): snapshot.data(user): null I/flutter (21763): ConnectionState.active I/flutter (21763): snapshot.data(user): FirebaseUser(Instance of 'PlatformUser') I/flutter (21763): disposed I/flutter (21763): init I/flutter (21763): ConnectionState.waiting I/flutter (21763): snapshot.data(user): null I/flutter (21763): ConnectionState.active I/flutter (21763): snapshot.data(user): FirebaseUser(Instance of 'PlatformUser') I/flutter (21763): disposed I/flutter (21763): init I/flutter (21763): ConnectionState.waiting I/flutter (21763): snapshot.data(user): null I/flutter (21763): ConnectionState.active I/flutter (21763): snapshot.data(user): FirebaseUser(Instance of 'PlatformUser')

Basically, it goes from ConnectionState.waiting to ConnectionState.active as expected.

Now, these are the logs that I get when I use onAuthStateChangedOrReloaded (the one using shareValue()) instead:

I/flutter (21763): created I/flutter (21763): init I/flutter (21763): ConnectionState.waiting I/flutter (21763): snapshot.data(user): null I/flutter (21763): ConnectionState.active I/flutter (21763): snapshot.data(user): FirebaseUser(Instance of 'PlatformUser') I/flutter (21763): disposed I/flutter (21763): init I/flutter (21763): ConnectionState.waiting I/flutter (21763): snapshot.data(user): FirebaseUser(Instance of 'PlatformUser') I/flutter (21763): ConnectionState.done I/flutter (21763): snapshot.data(user): FirebaseUser(Instance of 'PlatformUser') I/flutter (21763): disposed I/flutter (21763): init I/flutter (21763): ConnectionState.waiting I/flutter (21763): snapshot.data(user): FirebaseUser(Instance of 'PlatformUser') I/flutter (21763): ConnectionState.done I/flutter (21763): snapshot.data(user): FirebaseUser(Instance of 'PlatformUser') I/flutter (21763): disposed

As you can see, the stream created by shareValue() works as expected on the first subscription, than later it goes instantaneously from ConnectionState.waiting to ConnectionState.done.

Is this the expected behavior? I couldn't find it documented anywhere. Should I be doing something differently?

feinstein avatar Apr 26 '20 23:04 feinstein

Which version of rxdart are you using?

frankpepermans avatar Apr 27 '20 06:04 frankpepermans

Hey there -- that's actually the expected behavior of shareValue: It will start listening to the source stream when first listened to, then shut everything down when there are no more subscribers. I've tried to document it as part of the extension method: https://pub.dev/documentation/rxdart/latest/rx/ConnectableStreamExtensions/shareValue.html

This method is uses the refCount operator under the hood, which comes from Rx: https://pub.dev/documentation/rxdart/latest/rx/ConnectableStreamExtensions/shareValue.html

However, despite the fact that this is a standard Rx operator and is useful for some cases, many folks are surprised by / do not want this behavior, and want the Stream to keep working after the final subscription is cancelled.

Therefore, I've thought about implemented the operators asBroadcastValueStream and asBroadcastValueStreamSeeded. These would convert any Stream into a broadcast ValueStream, but would not do the reference counting that shareValue is doing, nor would it replay the latest event to the listener (similar to how asBroadcastStream does not replay values).

Would that work / be helpfu?

brianegan avatar Apr 27 '20 11:04 brianegan

Hmm I think I misunderstood what "shutsdown" means on the docs. I thought it would shutdown until someone subscribes again. Still what the logs show me is that the stream didn't completely shutdown, it emits a value and then closes, if it was truly shut down shouldn't it just don't emit at all?

I was using asBroadcastStream, but I changed to shareValue because I want to replay the latest event on each new listener.

Are there any ways of having a broadcast stream that replays the latest value?

feinstein avatar Apr 27 '20 11:04 feinstein

Thanks for the feedback, I'll try to rewrite those docs so they're easier to understand. In this case, since we don't have an operator fits your use case perfectly, you need to manage a BehaviorSubject or ConnectableStream to achieve this.

In your case, my recommendation would be like this:

class StreamContainer {
  final StreamController<FirebaseUser> _userReloadedStreamController = 
    StreamController<FirebaseUser>.broadcast();
  Stream<FirebaseUser> get onUserReloaded => _userReloadedStreamController.stream;
  Stream _onAuthStateChangedOrReloaded;
  StreamSubscription _connectionSubscription;

  Stream get onAuthStateChangedOrReloaded  => _onAuthStateChangedOrReloaded;

  void init() {
    // Manually Create the "Published" Stream using `publishValue` instead of `shareValue`
     _onAuthStateChangedOrReloaded = Rx.merge([FirebaseAuth.instance.onAuthStateChanged, onUserReloaded]).publishValue();

    // Ask the "Published" stream to "connect" to the underlying Merged Stream and start listening for values. Also store the subscription so we can cancel it later.
    _ connectionSubscription = _onAuthStateChangedOrReloaded.connect();
  }

  void dispose() {
    // When you're done using the Stream entirely, call the dispose method on this class which will cancel the underlying subscription to Rx.merge
    _connectionSubscription?.cancel();
  } 
}

Does that work for ya?

brianegan avatar Apr 27 '20 11:04 brianegan

Managing the stream was my plan B, I am just a bit disappointed there isn't an operator for something as simple as this.

I will try to adapt your example as my class is actually made of a bunch of static fields so it's easier for the users, but harder to control initialization and disposal, thanks Brian.

feinstein avatar Apr 27 '20 11:04 feinstein

@brianegan as far I have read Publish/Connect doesn't buffer the last value, so is your recommendation buffering it?

feinstein avatar Apr 27 '20 11:04 feinstein

Never mind, I can see it does buffer it in the docs, I guess other Rx Implementations don't buffer it, which confused me.

feinstein avatar Apr 27 '20 11:04 feinstein

Yah, there are three types of publish (Connectable Streams in general can be confusing): publish (no replay), publishValue (replay latest value), publishReplay (replay all values up to a max).

I am just a bit disappointed there isn't an operator for something as simple as this.

This is a tough call -- it's hard to have an operator for all of these different combinatorial cases without making a huge API. This might be a good time for an operator / extension you maintain in-house for now?

brianegan avatar Apr 27 '20 11:04 brianegan

For my particular use case I think I will just do a publishValue()..connect();, as the stream is supposed to live for the entire duration of the App and be initialized as soon as it starts, so no need to ever cancel it.

I understand your point regarding a big API, I just thought "a buferred broadcast stream" was a common call, as the difference between a single subscription stream and a broadcast stream (besides the obvious one) is that broadcast streams don't buffer values, so this new operator could remove the only missing feature from broadcast streams that single subscription streams have.

feinstein avatar Apr 27 '20 12:04 feinstein

@brianegan I am getting an error while trying to run some tests, here's my code:

  static Stream<FirebaseUser> _onAuthStateChangedOrReloaded =
      _mergeWithOnUserReloaded(_auth.onAuthStateChanged);

  static Stream<FirebaseUser> _mergeWithOnUserReloaded(Stream<FirebaseUser> stream) {
    return Rx.merge([stream, onUserReloaded]).publishValue()..connect();
  }

And here are the tests that are breaking:

test('Reloads emits the new User in onAuthStateChangedOrReloaded', () async {
  expect(FirebaseUserReloader.onAuthStateChangedOrReloaded,
      emitsInOrder([mockOldUser, mockNewUser]));
  await FirebaseUserReloader.reloadCurrentUser();
});

test('Current user is emmited when subscribing to onAuthStateChangedOrReloaded', () {
  expect(FirebaseUserReloader.onAuthStateChangedOrReloaded, emits(mockOldUser));
});

And these are the errors:

dart:core Object.noSuchMethod package:async/src/stream_queue.dart 472:19 StreamQueue._pause package:async/src/stream_queue.dart 434:7 StreamQueue._updateRequests package:async/src/stream_queue.dart 514:5 StreamQueue._addResult package:async/src/stream_queue.dart 484:9 StreamQueue._ensureListening. dart:async _EventSinkWrapper.add package:rxdart/src/transformers/start_with.dart 18:17 _StartWithStreamSink.add dart:async _StreamSinkWrapper.add package:rxdart/src/transformers/start_with.dart 59:13 _StartWithStreamSink._safeAddFirstEvent package:rxdart/src/transformers/start_with.dart 40:13 _StartWithStreamSink.onListen package:rxdart/src/utils/forwarding_stream.dart 18:22 forwardStream. dart:async _BoundSinkStream.listen package:rxdart/src/streams/defer.dart 37:18 DeferStream.listen dart:async StreamView.listen package:async/src/stream_queue.dart 483:31 StreamQueue._ensureListening package:async/src/stream_queue.dart 542:7 StreamQueue._addRequest package:async/src/stream_queue.dart 299:5 StreamQueue.startTransaction package:test_api expect package:flutter_test/src/widget_tester.dart 234:3 expect test\firebase_user_stream_test.dart 70:7 main.. ===== asynchronous gap =========================== dart:async _BoundSinkStream.listen package:rxdart/src/streams/defer.dart 37:18 DeferStream.listen dart:async StreamView.listen package:async/src/stream_queue.dart 483:31 StreamQueue._ensureListening package:async/src/stream_queue.dart 542:7 StreamQueue._addRequest package:async/src/stream_queue.dart 299:5 StreamQueue.startTransaction package:test_api expect package:flutter_test/src/widget_tester.dart 234:3 expect test\firebase_user_stream_test.dart 70:7 main..

NoSuchMethodError: The method 'pause' was called on null. Receiver: null Tried calling: pause()

I think dart stream tests use a StreamQueue and it's trying to pause this Connectable Stream, but Connectable Streams don't actually allow pausing, is this correct? How do we test them then?

feinstein avatar Apr 27 '20 19:04 feinstein

Using expectAsync1:

FirebaseUserReloader.onAuthStateChangedOrReloaded.listen(
  expectAsync1((user) => expect(user, mockOldUser))
);

hoc081098 avatar Apr 27 '20 19:04 hoc081098

Thanks @hoc081098, but how can I use emitsInOrder with expectAsync1?

feinstein avatar Apr 27 '20 20:04 feinstein

I tried this:

test('Reloads emits the new User in onAuthStateChangedOrReloaded', () async {
  FirebaseUserReloader.onAuthStateChangedOrReloaded.listen(expectAsync1(
      (user) => expect(user, emitsInOrder([mockOldUser, mockNewUser]))));

  await FirebaseUserReloader.reloadCurrentUser();
});

And I got this:

package:test_api                                       expect
package:flutter_test/src/widget_tester.dart 234:3      expect
test\firebase_user_stream_test.dart 71:21              main.<fn>.<fn>.<fn>
dart:async                                             _EventSinkWrapper.add
package:rxdart/src/transformers/start_with.dart 18:17  _StartWithStreamSink.add
dart:async                                             _StreamSinkWrapper.add
package:rxdart/src/transformers/start_with.dart 59:13  _StartWithStreamSink._safeAddFirstEvent
package:rxdart/src/transformers/start_with.dart 40:13  _StartWithStreamSink.onListen
package:rxdart/src/utils/forwarding_stream.dart 18:22  forwardStream.<fn>
dart:async                                             _BoundSinkStream.listen
package:rxdart/src/streams/defer.dart 37:18            DeferStream.listen
dart:async                                             StreamView.listen
test\firebase_user_stream_test.dart 70:57              main.<fn>.<fn>
===== asynchronous gap ===========================
dart:async                                             _BoundSinkStream.listen
package:rxdart/src/streams/defer.dart 37:18            DeferStream.listen
dart:async                                             StreamView.listen
test\firebase_user_stream_test.dart 70:57              main.<fn>.<fn>
Callback called more times than expected (1).
dart:async                                             _EventSinkWrapper.add
package:rxdart/src/transformers/start_with.dart 18:17  _StartWithStreamSink.add
dart:async                                             _BroadcastStreamController.add
package:rxdart/src/subjects/subject.dart 141:17        Subject._add
package:rxdart/src/subjects/subject.dart 135:5         Subject.add
===== asynchronous gap ===========================
dart:async                                             _BoundSinkStream.listen
package:rxdart/src/streams/defer.dart 37:18            DeferStream.listen
dart:async                                             StreamView.listen
test\firebase_user_stream_test.dart 70:57              main.<fn>.<fn>

Expected: should do the following in order:
          * emit an event that MockFirebaseUser:<MockFirebaseUser>
          * emit an event that MockFirebaseUser:<MockFirebaseUser>
  Actual: MockFirebaseUser:<MockFirebaseUser>
   Which: was not a Stream or a StreamQueue

As far as I understand the stream will be disconnected from emitsInOrder by expectAsync1, so I have no idea on how to make them work together.

Sorry if I am abusing guys, I am pretty lost on this one, I can't find any references online on how to test this.

feinstein avatar Apr 27 '20 21:04 feinstein

Trying like this

final expected = [mockOldUser, mockNewUser];
var i = 0;

stream.listen(
  expectAsync1(
    (user) => expect(user, expected[i++]),
    count: expected.length,
  )
);

Sent from my Redmi 7A using FastHub

hoc081098 avatar Apr 27 '20 21:04 hoc081098

Thanks @hoc081098 it works! It's tricky to test those things, I was looking at RxDart tests and I could only find expectLater and it doesn't work.

feinstein avatar Apr 27 '20 22:04 feinstein

FYI the tests are interacting between each other because the steam subscription wasn't closed, so I am adding this modification now.

feinstein avatar Apr 27 '20 23:04 feinstein

@brianegan I was looking at this behavior a bit deeper and I found something curious, if you look into my first post you can see that the StreamBuilder shows a ConnectionState.done for the shareValue() stream, but it still can reconnect to that same stream again, get the last value and complete with a ConnectionState.done. Is this the expected behavior? I thought once the number of subscribers gets to 0 no one can subscribe anymore and get any values from it.

feinstein avatar Apr 28 '20 02:04 feinstein

I understand your point regarding a big API, I just thought "a buferred broadcast stream" was a common call, as the difference between a single subscription stream and a broadcast stream (besides the obvious one) is that broadcast streams don't buffer values, so this new operator could remove the only missing feature from broadcast streams that single subscription streams have.

Agreed, thats why I'd thought about adding asBroadcastValueStream / asBroadcastValueStreamSeeded to do just that, and I'm specifically thinking of integrating with Flutter for these operators.

In my head, these operators would convert a normal Stream into a Broadcast Stream that allows you to read the latest value, but it would NOT replay the latest value the way shareValue does, nor would it shut down after the number of listeners goes to 0. In fact, I think that's actually best when working with StreamBuilder widgets.

An example might look like this:

final onAuthStateChangedOrReloaded = 
    Rx.merge([FirebaseAuth.instance.onAuthStateChanged, onUserReloaded]).asBroadcastValueStream();


// Later in your Widgets
StreamBuilder(
  initialData: onAuthStateChangedOrReloaded.value,
  stream: onAuthStateChangedOrReloaded,
  builder: (context, snapshot) => Text('${snapshot.data}'),
);

Basically, the asBroadcastValueStream would not replay the value, but you could still provide the latest captured value to the StreamBuilder widget's initialData property. This is actually better IMO than replaying the value, because it requires Flutter only build the Widget once. With replays, the StreamBuilder actually has to build twice (once with no data, then once the first data event is delivered by the Stream).

What do you think about that approach?

brianegan avatar Apr 28 '20 11:04 brianegan

I was looking at this behavior a bit deeper and I found something curious, if you look into my first post you can see that the StreamBuilder shows a ConnectionState.done for the shareValue() stream, but it still can reconnect to that same stream again, get the last value and complete with a ConnectionState.done. Is this the expected behavior? I thought once the number of subscribers gets to 0 no one can subscribe anymore and get any values from it.

Sounds fishy indeed. I'll write some tests to see what's going on there -- thanks for the info!

brianegan avatar Apr 28 '20 11:04 brianegan

I discovered a bug on StreamBuilder while investigating this, so I made a sample project to report it. You can use it for testing this if you want, but you will have to change my library's reference on pubspec.yaml to firebase_user_stream: 1.0.0-alpha.2 as this is the version that shows this behavior. Check the readme for some quick instructions on how to trigger it.

feinstein avatar Apr 28 '20 20:04 feinstein

What do you think about that approach?

I like it a lot for StreamBuilder, makes sense.

feinstein avatar Apr 28 '20 21:04 feinstein

@brianegan I think a replay observable that was long lived and didn't use refCount would be a valuable addition.

In regards to the stream builder building twice even if passed a type of stream where the value is stored so it can be access synchronously I create the package https://pub.dev/packages/flutter_rx_stream_builder a while ago.

jonsamwell avatar May 03 '20 21:05 jonsamwell

Oh, I'm working on my branch: https://github.com/hoc081098/rxdart/tree/hoc081098/value_subject to introduce ValueSubject :)

hoc081098 avatar May 04 '20 01:05 hoc081098

@brianegan I filed a bug here on StreamBuilder, but they complained my sample uses external libraries. I am having a hard time reproducing this using the standard Stream library, if you could give me some insights on this I would appreciate it.

feinstein avatar May 05 '20 22:05 feinstein

@feinstein Your initial problem description describes an issue with re-subscribing that sounds similar to what we've encountered. We use an Rx.defer pattern as I describe in this post https://github.com/ReactiveX/rxdart/issues/455#issuecomment-622561853.

rich-j avatar May 09 '20 22:05 rich-j

@rich-j thanks, I will take a look, but .publishValue()..connect(); is working for my use case.

feinstein avatar May 10 '20 00:05 feinstein

in a project I participate, I ended up running into this same problem, but in my use case, prevented from canceling the StreamSubscription returned by ConnectableStream.connect();

I will create an extension and add it to rxdart with a PR.

follow the example:


extension ValueExtensions<T> on Stream<T> {

  Stream<T> asValueStream() {
    StreamSubscription<T>? subscription;

    final stream = doOnDone((){
       // When source stream done, cancel [ConnectableStream] subscription
       subscription?.cancel();
    })
    .publishValue();
    
    // Intructs the [ConnectableStream to begin emiting items]
    subscription = stream.connect();

    return stream;
  }

  
  Stream<T> asValueSeededStream(T seedValue) {
    StreamSubscription<T>? subscription;

    final stream = doOnDone((){
       // When source stream done, cancel [ConnectableStream] subscription
       subscription?.cancel();
    })
    .publishValueSeeded(seedValue);
    
    // Intructs the [ConnectableStream to begin emiting items]
    subscription = stream.connect();

    return stream;
  }

 Stream<T> asReplayStream({int? maxSize}) {
    StreamSubscription<T>? subscription;

    final stream = doOnDone((){
       // When source stream done, cancel [ConnectableStream] subscription
       subscription?.cancel();
    })
    .publishReplay(maxSize: maxSize);
    
    // Intructs the [ConnectableStream to begin emiting items]
    subscription = stream.connect();

    return stream;
  }

  
  
 Stream<T> asBroadcast() {
    StreamSubscription<T>? subscription;

    final stream = doOnDone((){
       // When source stream done, cancel [ConnectableStream] subscription
       subscription?.cancel();
    })
    .publish();
    
    // Intructs the [ConnectableStream to begin emiting items]
    subscription = stream.connect();

    return stream;
  }

}

rullyalves avatar Aug 04 '21 19:08 rullyalves

asBroadcastValueStream would be very much appreciated

lukepighetti avatar Oct 15 '21 21:10 lukepighetti

It can be done, I would like to open a PR, but I would like the opinion of the repository maintainers as well.

rullyalves avatar Oct 15 '21 22:10 rullyalves

Guys, I came there from many years of experience in rxjs and java's reactor project... and this lib is so strange to me... BehaviorSubject which is canceled after last subscription - what is a purpose of a seed then? I usually use the BehaviorSubject as a value container, to notiffy all listeners - now and whenever - that value is changed, but also the current value. Now the question is on the listener side, whether this subject is still valid or canceled. What if canceled because there are no listeners any more, what then I supposed to do? Lie down and cry?

Azbesciak avatar Jul 05 '22 18:07 Azbesciak