rxjs-etc
rxjs-etc copied to clipboard
Observables and operators for RxJS
rxjs-etc
What is it?
A bunch of observables and operators for RxJS.
Why might you need it?
I created this package as a place to put additional RxJS observables, operators and methods. If you are looking for something that's not in the RxJS distribution, there might be something suitable in here - if you're lucky.
Install
Install the package using NPM:
npm install rxjs-etc --save
What's in it?
Observable
factories
-
combineLatestArray, concatArray, forkJoinArray, mergeArray, zipArray
A bunch of static methods that behave in a predictable manner when passed empty arrays. Some of these are now redundant, but some aren't.
To see how these methods behave, consult their tests.
-
combineLatestHigherOrderArray, combineLatestHigherOrderObject
Higher-order variants of
combineLatestArray
- that takesObservable<Observable<T>[]>
and returnsObservable<T[]>
- andcombineLatestObject
. -
combineLatestObject, forkJoinObject, zipObject
Like the array versions, but these take objects. Observable properties are combined using either
combineLatest
,forkJoin
orzip
. -
forkJoinConcurrent
Like
forkJoin
but only runs the specified number of observables concurrently. -
mergeHigherOrderArray
Higher-order variant of
mergeArray
- that takesObservable<Observable<T>[]>
and returnsObservable<T>
. -
toggle
Splits a notifier into two or more states and between which notifications are toggled.
-
traverse
Based on
expand
. Traverses a graph - with backpressure control - using either a notifier or a consumer. -
zipPadded
Works like
zipArray
, but if some sources complete whilst others continue to emit values, those the complete are 'padded' with the specifiedpadValue
(which defaults toundefined
). -
percolate
Runs a sequence of observables in order until an observable completes successfully.
Functions for use with pipe
or let
A bunch of functions that can be passed to the let
operator. Use them like this:
source.let(endWith("this is the end"))
They can also be used with pipe
, like this:
source.pipe(endWith("this is the end"))
-
bucketBy
Uses a hash function to put values from an observable stream into buckets - which are themselves observable streams. See
splitBy
. -
bufferRecent
Buffers the specified number of most-recent values.
-
concatIfEmpty
Like
defaultIfEmpty
, but it takes a default observable instead of a default value. -
concatMapEager
Like the RxJava
concatMapEager
operator. It accepts a concurrency and eagerly subscribes to its inner observables, buffering their values and then emitting them in theconcatMap
order. -
continueWith
Mirrors the source, but sends the last received value to a project function and merges the
ObservableInput
that it returns. -
debounceAfter
Debounce the source observable, but only after the notifier emits a value.
-
debounceSync
Debounces synchronously emitted values from a source.
-
debounceTimeSubsequent
Debounce the source observable, but don't debounce the first
count
notifications - only the subsequent notifications. -
debounceTimeWithinReason
Like
debounceTime
, but with an additional duration to ensure some notifications are emitted for super-busy streams. -
delayUntil
Delays a source's value notifications until a signal is received from a notifier.
-
dispose
Like
finalize
, but calls a child subscription's callback for its parent's. -
endWith
Like
startWith
, but for the other end. -
equals
Like
filter
, but takes a value - rather than a function - and performs a reference equality check. -
guard
Applies the specified TypeScript guard to change the source observable's type and perform a runtime check. Emits an error notification if the guard rejects a value.
-
hasCompleted
Emits
true
when the source observable completes. -
indexElements
Like
map((value, index) => index)
when it's called without a selector. When called with a selector, it's just an alias formap
. -
inexorably
Like
finalize
(which is also exported as an alias), but passes the callback theNotification
that effected the teardown, orundefined
if explicitly unsubscribed. -
initial
Apply the operator to the source observable, but select only the initial
count
notifications - don't select the subsequent notifications. -
pairwiseStartWith
Like a combination of
startWith
andpairwise
, but with more specific typings. -
pluck
Like
pluck
, but it's type-safe and only lets you valid keys. And it returns the appropriate type. -
prioritize
When creating signals from a source observable - for use with operators that take a notifier, like
buffer
andwindow
- the order in which subscriptions are made is important.prioritize
can be used to ensure that the notifier subscribes to the source first. -
rateLimit
A rate limiter with pass through when waiting is not necessary.
-
refCountDelay
Can be used with a
ConnectableObservable
instead ofrefCount
. When the reference count drops to zero, it waits the specified duration and then if the reference count is zero, it unsubscribes. If the reference count is incremented within the duration, no unsubscription occurs. -
refCountForever
Somewhat like the change that was made to
shareReplay
in5.5.0.beta.4
. When first subscribed to, a subscription is made to the source, but the source is never explicitly unsubscribed from. Unsubscription from the source only occurs if the source completes or errors. -
refCountOn
Like
refCount
, but performs connections and unsubscriptions on the specified scheduler. -
reschedule
Emits values using the specified scheduler.
-
skipSync
Skips the initial, synchronously emitted values from a source.
-
splitBy
Splits an observable stream into two streams. Values that satisfy a predicate are fed into the first stream and values that don't are fed into the second. It's a (better) replacement for
partition
- which did not multicast the source. SeebucketBy
for the general case of splitting a stream into a specific number of 'buckets'. -
startWithTimeout
Like
startWith
but only emits the starting value if the source does not emit within the specified duration. -
subsequent
Apply the operator to the source observable, but don't select the first
count
notifications - only the subsequent notifications. -
takeSync
Takes the initial, synchronously emitted values from a source and then completes.
-
takeWhileInclusive
Like
takeWhile
, but the value that fails the predicate is taken. -
tapSubscribe
Like
tap
, but for subscriptions and unsubscriptions instead of notifications. -
tapWithIndex
Like
tap
, but it receives a tuple that includes the emitted value and the index. -
throttleAfter
Throttle the source observable, but only after the notifier emits a value.
-
unsubscribeOn
Like
subscribeOn
, but for unsubscription.
Utility functions
A bunch of utility functions that do what their names suggest:
-
isNulled/isNonNulled
isNulled
returnstrue
if a value isnull
orundefined
. -
isObservable
-
isScheduler