rxdart
rxdart copied to clipboard
BUG: non-broadcast Stream cannot be listened more than once when asyncMap is used
I attached a code snippet that crashes:
final st = Stream.fromIterable([0, 1]).asyncMap((_) async => Future.delayed(Duration(seconds: 1), () => _));
st.listen((event) => print("st: $event"), onError: (e) => print("st: error $e"), onDone: () => print("st: onDone"));
await Future.delayed(Duration(seconds: 3));
st.listen((event) => print("st2: $event"), onError: (e) => print("st2: error $e"), onDone: () => print("st2: onDone"));
Crash logs:
st: 0
st: 1
st: onDone
Unhandled exception:
Bad state: Stream has already been listened to.
#0 _StreamController._subscribe (dart:async/stream_controller.dart:676:7)
#1 _ControllerStream._createSubscription (dart:async/stream_controller.dart:827:19)
#2 _StreamImpl.listen (dart:async/stream_impl.dart:471:9)
#3 main (file:///Users/user/FlutterProjects/sample/example/streams_example.dart:55:6)
<asynchronous suspension>
Expected behaviour: A new stream should be created, and listened to.
Use Rx.defer(() => ..., reusable: true)
Hmm, I could have easily wrapped it inside a function too:
st() => Stream.fromIterable([0, 1]).asyncMap((_) async => Future.delayed(Duration(seconds: 1), () => _));
which works relatively similar to defer (expect it doesn't wait on subscription to create the stream).
That's not very relevant though. The question is, is it correct that using asyncMap
makes the stream non-reusable? Feels like a bug to me.
Because Stream.fromIterable([0, 1])
is single-subscription stream, but allows listening to it multiple time, see also https://github.com/ReactiveX/rxdart/pull/694
- https://github.com/dart-lang/sdk/commit/330759efc0e41033d7f630c40af2deeb2e569d68, Stream.multi(isBroadcast:false)
- AsyncMap only check
stream.isBroadcast
, it does not know about Stream.multi or not.
I see your point. To me it still feels like unexpected behaviour. Wondering if it's worth trying to ask the people working on dart streams to expose some way of identifying MultiStream
s?