FrankKai.github.io
FrankKai.github.io copied to clipboard
Rxjs那些事儿
- of
- from
- map
- timer
- interval
- Observable
- Subject
- BehaviorSubject(与Subject区别、在medusa中的应用)
- Rxjs流最简流程(创建流、处理流、启动流、停止流)
- Rxjs在React框架中使用
- Rxjs方式改写react-request-queue
of
将参数转化为一个可观察序列。
每个参数都会作为下一个通知。
示例
发射数据10,20,30
import { of } from 'rxjs';
of(10,20,30).subscribe(
next => console.log('next:', next),
err => console.log('error:', err),
() => console.log('the end'),
)
发射数组[]10,20,30]
import { of } from 'rxjs';
of([1, 2, 3])
.subscribe(
next => console.log('next:', next),
err => console.log('error:', err),
() => console.log('the end'),
);
// Outputs
// next: [1, 2, 3]
// the end
from
将数组,类数组,Promise,可迭代对象等等,转化为Observable。
一句话总结:将万物转化为Observable
例子
同步流
import { from } from 'rxjs';
const array = [10, 20, 30];
const result = from(array);
console.log('start');
// 同步订阅
result.subscribe((x) => console.log(x));
console.log('end');
// Logs:
// 10
// 20
// 30
异步流
import { from, asyncScheduler } from 'rxjs';
console.log('start');
const array = [10, 20, 30];
const result = from(array, asyncScheduler);
result.subscribe(x => console.log(x));
console.log('end');
// Logs:
// start
// end
// 10
// 20
// 30
map
对observable发出的值,都经过map函数的处理,并且作为结果返回。
对每个值都乘以10
import { of, map } from 'rxjs';
of(1, 2, 3)
.pipe(map((num) => num * 10))
.subscribe(
(next) => console.log('next:', next),
(err) => console.log('error:', err),
() => console.log('the end')
);
// next:10
// next:20
// next:30
// end
timer
timer observable很适合在代码中创建延时,或者是与其他特定的值做race。 delay默认情况下是毫秒。
每隔2s,执行一次回调函数(0~2秒内不触发)
import { timer } from 'rxjs';
timer(0, 2000).subscribe((n) => console.log('timer', n));
interval
可用于准时返回递增数字。
例子
每秒+1,加到3秒
import { interval } from 'rxjs';
import { take } from 'rxjs/operators';
const numbers = interval(1000);
const takeFourNumbers = numbers.pipe(take(4));
takeFourNumbers.subscribe(x => console.log('Next: ', x));
// Logs:
// Next: 0
// Next: 1
// Next: 2
// Next: 3
每隔2s,执行一次回调函数(0~2秒内触发)
import { interval } from 'rxjs';
interval(2000).subscribe((n) => console.log('timer', n));
Observable
通过Observable创建一个流,并且通过subscriber.next控制输出。
import { Observable } from 'rxjs';
const stream$ = new Observable((subscriber) => {
setTimeout(() => {
subscriber.next([1, 2, 3]);
}, 500);
setTimeout(() => {
subscriber.next({ a: 1000 });
}, 1000);
setTimeout(() => {
subscriber.next('end');
}, 3000);
setTimeout(() => {
subscriber.complete();
}, 4000);
});
// 启动流
stream$.subscribe({
complete: () => console.log('done'),
next: (v) => console.log(v),
error: () => console.log('error'),
});
Subject
What is a Subject? An RxJS Subject is a special type of Observable that allows values to be multicasted to many Observer
是一种特殊类型的Observable,可以将消息同时广播给多个Observer,也就是多播。 而普通Observable则对应一个Observer,是单播。 可以理解为最简发布订阅。
import { Subject } from 'rxjs';
// 创建subject
const subject = new Subject();
// 订阅一个observer
subject.subscribe((v) => console.log('stream 1', v));
// 再订阅一个observer
subject.subscribe((v) => console.log('stream 2', v));
// 延时1s再订阅一个observer
setTimeout(() => {
subject.subscribe((v) => console.log('stream 3', v));
}, 1000);
// 产生数据1
subject.next(1);
// 产生数据2
subject.next(2);
// 延时3s产生数据3
setTimeout(() => {
subject.next(3);
}, 3000);
// output
// stream 1 1 //立即输出
// stream 2 1 //立即输出
// stream 1 2 //立即输出
// stream 2 2 //立即输出
// stream 1 3 //3s后输出
// stream 2 3 //3s后输出
// stream 3 3 //3s后输出
BehaviorSubject(与Subject区别、在medusa中的应用)
Subject的变体,需要初始值,并在订阅时发出其当前值。
注意:与Subject不同的是,需要初始值,而且subject.next会在所有订阅函数中执行(无论前后位置)。
import { BehaviorSubject } from 'rxjs';
const subject = new BehaviorSubject(123);
subject.subscribe(console.log);
subject.subscribe(console.log);
subject.next(456);
subject.subscribe(console.log);
subject.next(789);
// 123
// 123
// 456
// 456
// 456
// 789
// 789
// 789
Subject只会被位于subject.next之前订阅函数处理。
const subject = new Subject();
subject.subscribe(console.log);
subject.subscribe(console.log);
subject.next(101112); // 注意这里,只打印了2次
subject.subscribe(console.log);
subject.next(131415);
// 101112
// 101112
// 131415
// 131415
// 131415
tuya的微前端框架medusa,有用到BehaviorSubject这个类,用于主子应用做通信。
源码地址:https://github.com/tuya/medusa/blob/main/packages/medusa/src/packages/store.ts
发送数据
import { dispatch } from 'mmed/client';
dispatch("Hello World", false, 'foo')
接收数据
import { subscribe } from 'mmed/client';
useEffect(() => {
const stream$ = subscribe((data) => {
console.log(data) // Hello World
}, 'foo');
return () => {
stream$.unsubscribe();
};
}, [])
Rxjs流最简流程(创建流、处理流、启动流、停止流)
分为4步:
- 创建流
- 处理流(或者说消费流)
- 启动流
- 停止流
import { Observable } from "rxjs";
// 创建流 - Observable
const stream$ = new Observable(subscriber => {
setTimeout(() => {
subscriber.next([1, 2, 3]);
}, 500);
});
// 处理流(消费流产生的数据)- Observer
const observer = {
complete: () => console.log("done"),
next: v => console.log(v),
error: () => console.log("error")
};
// 启动流 - Subscription
const subscription = stream$.subscribe(observer);
setTimeout(() => {
// 停止流
subscription.unsubscribe();
}, 1000);
Rxjs在React框架中使用
这是一个例子,模拟了如何使用Rxjs,改造 前端请求接口后返回的数据,与prop数组做拼接 这种场景。
传统方式
import * as React from 'react';
const GreetSomeone = ({ greet = 'Hello' }) => {
const [greeting, setGreeting] = React.useState('');
const [name, setName] = React.useState('');
React.useEffect(() => {
setTimeout(() => {
setName('World');
}, 3000);
}, []);
React.useEffect(() => {
setGreeting(`${greet}, ${name}!`);
}, [greet, name]);
return <p>{greeting}</p>;
};
export default GreetSomeone;
rxjs方式
import * as React from 'react';
import { combineLatest, from, of, Observable, BehaviorSubject } from 'rxjs';
import { catchError, map, startWith } from 'rxjs/operators';
const GreetSomeone = ({ greet = 'Hello' }) => {
const [greeting, setGreeting] = React.useState('');
// 创建greet流
const greet$ = React.useRef(new BehaviorSubject(greet));
React.useEffect(() => {
greet$.current.next(greet);
}, [greet]);
React.useEffect(() => {
console.log('创建流');
// 创建name流
const name$ = from(
// 模拟远程搜索数据
new Observable((subscriber) => {
setTimeout(() => {
subscriber.next('World');
}, 3000);
})
).pipe(
startWith('_____'),
catchError(() => of('Mololongo'))
);
// 创建greet和name合并流
const greeting$ = combineLatest(greet$.current, name$).pipe(
map(([greet, name]) => `${greet}, ${name}!`)
);
// 启动合并流
const subscription = greeting$.subscribe((value) => {
setGreeting(value);
});
return () => {
subscription.unsubscribe();
};
}, []);
return <p>{greeting}</p>;
};
export default GreetSomeone;
demo地址:https://stackblitz.com/edit/react-ts-jmk8wd?file=Hello.tsx