rxjs
rxjs copied to clipboard
RFC: A new way of "piping"
Given the move away from lift: #7201 #7202
We can make our observable even more compatible with other observables by migrating us away from the pipe method. This is not something that would happen immediately, rather over a very, very long period. The idea is to increase compatibility and improve ergonomics.
Problem: It's annoying to have to from(x) types in order to use our operators with them
There's a common issue where people want to convert other things to observables in order to use our operators. Whether it's an array or a promise, there are times were you need to do from and the only reason you're doing it is because you want that .pipe method, and you want an observable to operate on.
Problem: pipe is a method
- It's not tree shakable
- It's unlikely to exist on "other observables".
The second one is more of a problem, honestly. If people implement code with the expectation that pipe exists as a method on whatever observable they get, then even while we've made sure our operators will work with any "same-shaped" observable, that custom code will break, because it's expecting pipe to be a method.
Proposal
A new rx method that accepts an ObservableInput as the first argument, and a rest of operators:
export function rx<A, B>(source: ObservableInput<A>, a: OperatorFunction<A, B>): Observable<B>;
export function rx<A, B, C>(source: ObservableInput<A>, a: OperatorFunction<A, B>, b: OperatorFunction<B, C>): Observable<C>;
export function rx<A, B, C, D>(source: ObservableInput<A>, a: OperatorFunction<A, B>, b: OperatorFunction<B, C>, c: OperatorFunction<C, D>): Observable<D>;
// ... and so on
export function rx<T>(source: ObservableInput<T>, ...operators: OperatorFunction<T, any>[]): Observable<unknown> {
returm pipeArray(operators)(from(source))
}
(Technically, rx is just a standard functional pipe with some specific types, it could be the same pipe method we expose under the hood)
The usage would be as such:
import { rx, map, delay } from 'rxjs'
rx(source$, map(n => n + n)).subscribe(console.log)
// or
rx(somePromise, delay(1000)).subscribe(console.log)
// or
rx(someAsyncIterable, map(n => n + n)).subscribe(console.log)
// ...and so on.
Bonus? from could migrate to just be rx as well
For a long time people have complained about from being a horrible name for an exported function. (Although I do like import { from } from './from' in our codebase 😄). This would mean that couple could technically just do: rx(somePromise) and it's the same as from(somePromise).
I like the approach!
It's not tree shakable
💯
Seems great!
Is "rx" the final proposed name? I know it'd be a breaking change, but what if...
import { pipe, flow } from "rxjs";
// Pipes a value into operators
const derived$ = pipe(source$, ...operators)
// Or any other name really. This is what the standalone "pipe" is at the moment.
const customOperator = flow(...operators)
Would it be too bad? 😬 Mentally, .pipe() works on an Observable, I've always found it strange that the standalone version does not!
- Why not call it
pipe? - if this is long-term, won't the pipeline operator materialize first?
pipe already exists.
pipeline a la Node.js then?
Not sure how you'd use rx without a pipe concept to be able to tree shake it, but I think this is a great way to solve interop; I like the ergonomics. rx() also has the benefit of wrapping the behavior in something unambiguous.
I like it. Ship it :shipit:
I know people would probably hate it, but have you considered a function that returns a function that would pipe the operator arguments and call the result with the observable passed.
rx(source$)(...operators)
One option is to simply make a function with 2 arguments (the second is an array):
rx(fromEvent(document.body, "mousemove"), [
debounceTime(2),
map(({ clientX }) => clientX),
take(2),
takeUntil(destroy$),
]);
I know people would probably hate it, but have you considered a function that returns a function that would pipe the operator arguments and call the result with the observable passed.
@BigAB why do you like this shape? Is there a use case?
Why not call it pipe?
@timdp Because it's not really just a pipe. It's applying from to the first argument.
Technically it's pipe(source, from, map(fn), ...)
@timdp Because it's not really just a
pipe. It's applyingfromto the first argument.Technically it's
pipe(source, from, map(fn), ...)
Personally, I don't think that makes a huge difference, but I do see your point.
But if pipe is too ambiguous a name, then the same definitely applies to rx.
How about pipeFrom?
@benlesh
- how about composition?
const step2$ = rx(step1$, map(n => n + n))
const step3$ = rx(step2$, filter(n => n > 0))
step3$.subscribe(console.log)
would the above be the way to "wrap" an observable from the outside?
- how about creating custom operators? We can create them using
new Observable(subscriber => { ... })and (I think) nothing changes here. However, it's also possible to usepipedirectly, e.g.:
import { pipe } from "rxjs";
import { tap } from "rxjs/operators";
function logWithTag(tag: string) {
return pipe(
tap(v => console.log(`logWithTag(${tag}): ${v}`)),
// put whatever operators you need here
);
}
// and then
source$.pipe(logWithTag('my-tag')).subscribe(whatever)
with pipe getting deprecated and eventually replaced by rx(source$, ...operators).subscribe(subscriber), wouldn't be the pipe-based composition become out of reach? We'd have to move it to:
function withLog(source: ObservableInput<A>, tag: string) {
return rx(source,
tap(v => console.log(`logWithTag(${tag}): ${v}`)),
// put whatever operators you need here
);
}
which is doable, but moves away from classical functional composition, I'm afraid. What's your take on this?
I don't really like the syntax since it wouldn't be compatible with the pipeline operator (which would be a big benefit for RxJS) right?
Here is an example (in which you are even mentionned haha)
@LcsGa currently the TC39 pipeline operator is not the one as in the example.
The operators in RxJS would be usable in the F# pipe operator proposal, where every operator is a function that takes one argument and returns one value, which is passed to the next one.
TC39 went with the hack syntax, where you need to explicitely pass in a "placeholder" which says where the value is passed onto. So with TC39 pipes and current operators, it would look like:
interval(100)
|> take(5)(%)
|> map(v => v + 3)(%)
More about this discussion here https://github.com/ReactiveX/rxjs/discussions/7194
@voliva Thanks for the information, I wasn't aware of this!
@BigAB why do you like this shape? Is there a use case?
I like a clear separation between the subject of the call and the composition of functions.
Using an array as the second arg is pretty good and achieves the separation I want, I just sort of "didn't like it" 🤷, but it's not bad :
const obs$ = rx(source$, [
debounceTime(2),
map(({ clientX }) => clientX),
take(2),
takeUntil(destroy$),
])
pipe(...operators)(source$) is super great, because it doesn't redefine a well known pipe definition, but it sort of "distracts" having the source last, and the whole composing a new observable from source and pipe is so common in RxJS code, it feels like it should go first.
With it first it sort of looks like the F# pipe operator shape...
const obs$ = rx(source$)(
debounceTime(2),
map(({ clientX }) => clientX),
take(2),
takeUntil(destroy$),
)
I actually really like the idea of calling it pipeFrom or pipeThrough too:
const obs$ = pipeFrom(source$)(
debounceTime(2),
map(({ clientX }) => clientX),
take(2),
takeUntil(destroy$),
)
EDIT:
If it was pipeThrough and it was implemented something like
export const pipe = (...fns) => (input) => fns.reduce((r, fn) => fn(r), input);
export const pipeThrough = (source) => (...operators) => pipe(...operators)(source);
...you could use it on non-observables too 🤷 (and just check for Symbol.observable before doing anything Rx specific like lift stuff)
const result = pipeThrough(3)(
(n) => n*2,
(n) => n + 1,
(x) => `WHOA ${x}!`
)
// result === "Whoa 7!"
I don't know if I missed the "Bonus" section the first time I read it but, after reading it, yeah getting all the from() functionality in rx() makes me think that
rx(source$, [
debounceTime(2),
map(({ clientX }) => clientX),
take(2),
takeUntil(destroy$),
])
...is the better choice, because you get the great "turn anything into an Rx observable" from rx(whatever), but then you also separate the function composition effectively
CORE TEAM:
- The name needs to be bikeshedded
- why not just direct people to use
pipe(from(x), ...operators)? - maybe have
pipe(first, ...operators)check the type offirstand usefromif it's not a function?
There's value here, we want to do this.
@benlesh Can we use from for this?
from(source$, [
debounceTime(2),
map(({ clientX }) => clientX),
take(2),
takeUntil(destroy$),
])
@demensky - do you think that's stretching intention of from a little too far, or that it could create cognitive dissonance in users. It's def tidy, which is nice, but pipe(first, ...operators) is tidy too.
Extending from is also briefly discussed, but consensus was extending existing operator would be somewhat confusing as well as typescript support can be tricky.
Personally, I still like rx(source, ...operators).
- It's terse.
- It can replace
from(source)in the long term with justrx(source). (andfromis sort of a bad name we stuck with for historical reasons) - It's fairly descriptive, even though it seems like it's not.
rx, R.X., Reactive Extensions. inrx(source, ...operators)you're taking some source and extending it with reactivity through those composed operators. I think it fits.
I'm still open to other names though. I'd just prefer that it's short and to the point.
Also something to think about, operators[] vs ...operators... This is formatted with Prettier:
Using operators[]
rx(mouseDowns$, [
exhaustMap((e) => {
const startX = e.clientX;
return rx(mouseMoves$, [
takeUntil(mouseups$),
map((e) => e.clientX - startX),
]);
}),
]).subscribe(handleDrag);
Using ...operators
rx(
mouseDowns$,
exhaustMap((e) => {
const startX = e.clientX;
return rx(
mouseMoves$,
takeUntil(mouseups$),
map((e) => e.clientX - startX)
);
})
).subscribe(handleDrag);
Honestly, and I think this is weird, the operators[] version is a little more readable when formatted due to indentation. It's also a bit cheaper, implementation wise.
I would also expect operators[]. Formatting is one motivation, but with a general-purpose function like this, it's also not unlikely that it'll take an options object or another argument at some point.
(Personally, I'm still not sold on the rx name though. Why is it so first-class that it should occupy the name of the library itself? I'm not claiming pipeFrom would be any better but like you said, it needs bikeshedding.)
Let's call it lift?
Kidding… :trollface:
Do I understand correctly that this rx function will be required until there are versions of operators that are convenient to use with the pipeline operator? #7194
For example map will have a signature like this.
function map<T, R>(source$: ObservableInput<T>, project: (value: T, index: number) => R): Observable<R>;
It turns out that rx will only be for operators of external libraries that have not had time to update, right?
CORE TEAM: How about r for the name?
The concern with rx as a name revolves around communication. It might be confusing for new users to discuss "Use the rx function" versus literally any other function in our library, which people often refer to as "rx".
Personally I would be in favor of rx rather than r in fear of certain IDEs not allowing me to auto-complete/auto-import efficiently as I type if it's just a single character. What might happen is that other things starting with r are considered more relevant.
Here is the thing
What about making any observable return function and be function in a same time?
Sample:
source$(tap(console.log)).subscribe()
And
source$.subscribe()
Will both work
@BigAB why do you like this shape? Is there a use case?
I like a clear separation between the subject of the call and the composition of functions.
Using an array as the second arg is pretty good and achieves the separation I want, I just sort of "didn't like it" 🤷, but it's not bad :
const obs$ = rx(source$, [ debounceTime(2), map(({ clientX }) => clientX), take(2), takeUntil(destroy$), ])
I like this approach, but what you say about this approach ?
const obs$ = rx( source$, compose([
debounceTime(2),
map(({ clientX }) => clientX),
take(2),
takeUntil(destroy$)
])
)