Allow `.clear()`ing the stored value or error in a `BehaviorSubject`
When building with BehaviorSubject, the need arose to be able to clear the subject to reset it to the same state it has after having been instantiated without a seed.
This can be helpful if the subject holds an error and we want to retry the operation that caused it. During the retry, any new listeners still immediately receive the error value, while we want them to have to wait for the first value after the retry.
Related: #233
A simple fix for this is a re-implementation of BehaviorSubject that supports .clear():
/// Clears the subject and removes the last value or error.
void clear() => _wrapper
..value = EMPTY
..isValue = false
..errorAndStackTrace = null;
Full Code
```dart // ignore_for_file: implementation_imports, // ignore_for_file: avoid_equals_and_hash_code_on_mutable_classesimport 'dart:async';
import 'package:rxdart/src/rx.dart'; import 'package:rxdart/src/streams/value_stream.dart'; import 'package:rxdart/src/transformers/start_with.dart'; import 'package:rxdart/src/transformers/start_with_error.dart'; import 'package:rxdart/src/utils/empty.dart'; import 'package:rxdart/src/utils/error_and_stacktrace.dart'; import 'package:rxdart/src/utils/notification.dart'; import 'package:rxdart/subjects.dart';
/// A variant of [BehaviorSubject] that allows clearing the subject and removing /// the last value or error and resets it to the state as if it was just created /// without a seed value. /// /// Other than that, this works exactly like [BehaviorSubject]. class ClearableBehaviorSubject<T> extends Subject<T> implements ValueStream<T> { /// Constructs a [ClearableBehaviorSubject], optionally pass handlers for /// [onListen], [onCancel] and a flag to handle events [sync]. /// /// See also [StreamController.broadcast] factory ClearableBehaviorSubject({ void Function()? onListen, void Function()? onCancel, bool sync = false, }) { // ignore: close_sinks final controller = StreamController<T>.broadcast( onListen: onListen, onCancel: onCancel, sync: sync, );
final wrapper = _Wrapper<T>();
return ClearableBehaviorSubject<T>._(
controller,
Rx.defer<T>(_deferStream(wrapper, controller, sync), reusable: true),
wrapper,
);
}
/// Constructs a [ClearableBehaviorSubject], optionally pass handlers for /// [onListen], [onCancel] and a flag to handle events [sync]. /// /// [seedValue] becomes the current [value] and is emitted immediately. /// /// See also [StreamController.broadcast] factory ClearableBehaviorSubject.seeded( T seedValue, { void Function()? onListen, void Function()? onCancel, bool sync = false, }) { // ignore: close_sinks final controller = StreamController<T>.broadcast( onListen: onListen, onCancel: onCancel, sync: sync, );
final wrapper = _Wrapper<T>.seeded(seedValue);
return ClearableBehaviorSubject<T>._(
controller,
Rx.defer<T>(_deferStream(wrapper, controller, sync), reusable: true),
wrapper,
);
}
ClearableBehaviorSubject._( super.controller, super.stream, this._wrapper, );
final _Wrapper<T> _wrapper;
static Stream<T> Function() _deferStream<T>( _Wrapper<T> wrapper, StreamController<T> controller, bool sync, ) => () { final errorAndStackTrace = wrapper.errorAndStackTrace; if (errorAndStackTrace != null && !wrapper.isValue) { return controller.stream.transform( StartWithErrorStreamTransformer( errorAndStackTrace.error, errorAndStackTrace.stackTrace, ), ); }
final value = wrapper.value;
if (isNotEmpty(value) && wrapper.isValue) {
return controller.stream
.transform(StartWithStreamTransformer(value as T));
}
return controller.stream;
};
@override void onAdd(T event) => _wrapper.setValue(event);
@override void onAddError(Object error, [StackTrace? stackTrace]) => _wrapper.setError(error, stackTrace);
/// Clears the subject and removes the last value or error. void clear() => _wrapper.clear();
@override ValueStream<T> get stream => _Stream(this);
@override bool get hasValue => isNotEmpty(_wrapper.value);
@override T get value { final value = _wrapper.value; if (isNotEmpty(value)) { return value as T; } throw ValueStreamError.hasNoValue(); }
@override T? get valueOrNull => unbox(_wrapper.value);
/// Set and emit the new value. set value(T newValue) => add(newValue);
@override bool get hasError => _wrapper.errorAndStackTrace != null;
@override Object? get errorOrNull => _wrapper.errorAndStackTrace?.error;
@override Object get error { final errorAndSt = _wrapper.errorAndStackTrace; if (errorAndSt != null) { return errorAndSt.error; } throw ValueStreamError.hasNoError(); }
@override StackTrace? get stackTrace => _wrapper.errorAndStackTrace?.stackTrace;
@override StreamNotification<T>? get lastEventOrNull { // data event if (_wrapper.isValue) { return StreamNotification.data(_wrapper.value as T); }
// error event
final errorAndSt = _wrapper.errorAndStackTrace;
if (errorAndSt != null) {
return ErrorNotification(errorAndSt);
}
// no event
return null;
} }
class _Wrapper<T> { /// Non-seeded constructor _Wrapper() : isValue = false;
_Wrapper.seeded(T v) { setValue(v); }
bool isValue = false; Object? value = EMPTY; ErrorAndStackTrace? errorAndStackTrace;
void setValue(T event) { value = event; isValue = true; }
void setError(Object error, StackTrace? stackTrace) { errorAndStackTrace = ErrorAndStackTrace(error, stackTrace); isValue = false; }
void clear() { value = EMPTY; isValue = false; errorAndStackTrace = null; } }
class _Stream<T> extends Stream<T> implements ValueStream<T> { _Stream(this._subject);
final ClearableBehaviorSubject<T> _subject;
@override bool get isBroadcast => true;
// Override == and hashCode so that new streams returned by the same // subject are considered equal. // The subject returns a new stream each time it's queried, // but doesn't have to cache the result.
@override int get hashCode => _subject.hashCode ^ 0x35323532;
@override bool operator ==(Object other) { if (identical(this, other)) { return true; }
return other is _Stream && identical(other._subject, _subject);
}
@override StreamSubscription<T> listen( void Function(T event)? onData, { Function? onError, void Function()? onDone, bool? cancelOnError, }) => _subject.listen( onData, onError: onError, onDone: onDone, cancelOnError: cancelOnError, );
@override Object get error => _subject.error;
@override Object? get errorOrNull => _subject.errorOrNull;
@override bool get hasError => _subject.hasError;
@override bool get hasValue => _subject.hasValue;
@override StackTrace? get stackTrace => _subject.stackTrace;
@override T get value => _subject.value;
@override T? get valueOrNull => _subject.valueOrNull;
@override StreamNotification<T>? get lastEventOrNull => _subject.lastEventOrNull; }
</details>
While it is useful in Flutter development, I don't see any ReactiveX implementations that have this method of BehaviorSubject.