rxdart icon indicating copy to clipboard operation
rxdart copied to clipboard

BUG: non-broadcast Stream cannot be listened more than once when asyncMap is used

Open AlexDochioiu opened this issue 1 year ago • 5 comments

I attached a code snippet that crashes:

  final st = Stream.fromIterable([0, 1]).asyncMap((_) async => Future.delayed(Duration(seconds: 1), () => _));
  st.listen((event) => print("st: $event"), onError: (e) => print("st: error $e"), onDone: () => print("st: onDone"));
  await Future.delayed(Duration(seconds: 3));
  st.listen((event) => print("st2: $event"), onError: (e) => print("st2: error $e"), onDone: () => print("st2: onDone"));

Crash logs:

st: 0
st: 1
st: onDone
Unhandled exception:
Bad state: Stream has already been listened to.
#0      _StreamController._subscribe (dart:async/stream_controller.dart:676:7)
#1      _ControllerStream._createSubscription (dart:async/stream_controller.dart:827:19)
#2      _StreamImpl.listen (dart:async/stream_impl.dart:471:9)
#3      main (file:///Users/user/FlutterProjects/sample/example/streams_example.dart:55:6)
<asynchronous suspension>

Expected behaviour: A new stream should be created, and listened to.

AlexDochioiu avatar Apr 23 '23 08:04 AlexDochioiu

Use Rx.defer(() => ..., reusable: true)

hoc081098 avatar Apr 23 '23 08:04 hoc081098

Hmm, I could have easily wrapped it inside a function too: st() => Stream.fromIterable([0, 1]).asyncMap((_) async => Future.delayed(Duration(seconds: 1), () => _));

which works relatively similar to defer (expect it doesn't wait on subscription to create the stream).

That's not very relevant though. The question is, is it correct that using asyncMap makes the stream non-reusable? Feels like a bug to me.

AlexDochioiu avatar Apr 23 '23 08:04 AlexDochioiu

Because Stream.fromIterable([0, 1]) is single-subscription stream, but allows listening to it multiple time, see also https://github.com/ReactiveX/rxdart/pull/694

hoc081098 avatar Apr 23 '23 09:04 hoc081098

  • https://github.com/dart-lang/sdk/commit/330759efc0e41033d7f630c40af2deeb2e569d68, Stream.multi(isBroadcast:false)
  • AsyncMap only check stream.isBroadcast, it does not know about Stream.multi or not.

hoc081098 avatar Apr 23 '23 09:04 hoc081098

I see your point. To me it still feels like unexpected behaviour. Wondering if it's worth trying to ask the people working on dart streams to expose some way of identifying MultiStreams?

AlexDochioiu avatar Apr 23 '23 09:04 AlexDochioiu