bingoogolapple.github.io
bingoogolapple.github.io copied to clipboard
RxJava2
RxJava2
RxJava2 学习资料推荐
Rxjava2.0新特性
HelloWorld
private void helloWorld() {
Schedulers.computation().scheduleDirect(() -> {
Logger.d("在 computation 线程订阅"); // RxComputationThreadPool-1
Observable.create((ObservableOnSubscribe<Integer>) emitter -> { // RxCachedThreadScheduler-1【受 A 影响】
try {
if (!emitter.isDisposed()) { // Observable 并不是在创建的时候就立即开始发送事件,而是在它被订阅的时候
Thread.sleep(3000);
Logger.d("onNext");
emitter.onNext(0);
}
if (!emitter.isDisposed()) {
Thread.sleep(3000);
Logger.d("onComplete");
emitter.onComplete(); // 在一个正确运行的事件序列中, onComplete() 和 onError() 有且只有一个,并且是事件序列中的最后一个
}
} catch (Exception e) {
if (!emitter.isDisposed()) {
Logger.d("onError");
emitter.onError(e);
}
}
}).doOnNext(data -> Logger.d("doOnNext1")) // RxCachedThreadScheduler-1【受 A 影响】
.doOnComplete(() -> Logger.d("doOnComplete1")) // RxCachedThreadScheduler-1【受 A 影响】
.doOnError(throwable -> Logger.d("doOnError1")) // RxCachedThreadScheduler-1【受 A 影响】
.map(data -> { // RxCachedThreadScheduler-1【受 A 影响】
Logger.d("map1");
return data + 1;
})
.flatMap(data -> { // RxCachedThreadScheduler-1【受 A 影响】
Logger.d("flatMap1");
return Observable.just(data);
// return Observable.just(data)
// .observeOn(Schedulers.newThread()); // 【B newThread】如果后续还有 observeOn,则影响两个 observeOn 之间操作符的执行线程「flatMap 内部执行 observeOn 来切换线程也会外部的操作符执行线程,但不会影响外部的 doOnComplete」
})
.observeOn(Schedulers.newThread()) // 【B newThread】如果后续还有 observeOn,则影响两个 observeOn 之间操作符的执行线程
.doOnNext(data -> Logger.d("doOnNext2")) // RxNewThreadScheduler-2【受 B 影响】
.doOnComplete(() -> Logger.d("doOnComplete2")) // RxNewThreadScheduler-2【受 B 影响】
.doOnError(throwable -> Logger.d("doOnError2")) // RxNewThreadScheduler-2【受 B 影响】
.flatMap(integer -> { // RxNewThreadScheduler-2【受 B 影响】
Logger.d("flatMap2");
return Observable.just(Long.valueOf(integer));
})
.map(data -> { // RxNewThreadScheduler-2【受 B 影响】
Logger.d("map2");
return data + 1;
})
.doOnSubscribe(disposable -> Logger.d("订阅成功 doOnSubscribe4")) // RxCachedThreadScheduler-1【在 A 处的 subscribeOn 之前,受 A 影响】
.subscribeOn(Schedulers.io()) // 【A io】第一次执行,影响「被观察者」以及「被观察者和第一个 observeOn」之间操作符的执行线程
.doOnSubscribe(disposable -> Logger.d("订阅成功 doOnSubscribe3")) // RxNewThreadScheduler-1【在 D 处的 subscribeOn 之前,受 D 影响】
.doOnNext(data -> Logger.d("doOnNext3")) // RxNewThreadScheduler-2【受 B 影响】
.doOnComplete(() -> Logger.d("doOnComplete3")) // RxNewThreadScheduler-2【受 B 影响】
.doOnError(throwable -> Logger.d("doOnError3")) // RxNewThreadScheduler-2【受 B 影响】
.flatMap(integer -> { // RxNewThreadScheduler-2【受 B 影响】
Logger.d("flatMap3");
return Observable.just(Double.valueOf(integer));
})
.map(data -> { // RxNewThreadScheduler-2【受 B 影响】
Logger.d("map3");
return data + 1;
})
.observeOn(AndroidSchedulers.mainThread()) // 【C main】如果后续没有 observeOn,则影响后续所有操作符的执行线程(包括 Observer 中的所有方法的执行线程)
.doOnNext(data -> Logger.d("doOnNext4")) // main【受 C 影响】
.doOnComplete(() -> Logger.d("doOnComplete4")) // main【受 C 影响】
.doOnError(throwable -> Logger.d("doOnError4")) // main【受 C 影响】
.map(data -> { // main【受 C 影响】
Logger.d("map4");
return data + 1;
})
.compose(bindToLifecycle()) // 如果 Activity 销毁时被观察者还没有发射 onComplete 或 onError,会回调「compose 之后的 doOnComplete」和「Observer 的 onComplete 方法」
.doOnNext(data -> Logger.d("doOnNext5")) // main【受 C 影响】
.doOnComplete(() -> Logger.d("doOnComplete5")) // main【受 C 影响】
.doOnError(throwable -> Logger.d("doOnError5")) // main【受 C 影响】
.map(data -> { // main【受 C 影响】
Logger.d("map5");
return data + 1;
})
.flatMap(data -> { // main【受 C 影响】
Logger.d("flatMap4");
return Observable.just("转换了 " + data + " 次 map 操作");
})
.doOnSubscribe(disposable -> Logger.d("订阅成功 doOnSubscribe2")) // RxNewThreadScheduler-1【在 subscribeOn 之前,受 D 影响】
.subscribeOn(Schedulers.newThread()) // 【D newThread】第二次执行,只影响两个 subscribeOn 之间的 doOnSubscribe
.doOnSubscribe(disposable -> Logger.d("订阅成功 doOnSubscribe1")) // RxComputationThreadPool-1【在 subscribeOn 之后,与 Observer 的 onSubscribe 方法一样】
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable disposable) { // RxComputationThreadPool-1
// 在 subscribe 刚开始,但事件还未发送之前被调用,可以用于做一些准备工作,如果对准备工作的线程有要求,可能该方法就不适用做准备工作
// 因为该方法总是在 subscribe 所发生的线程被调用,而不能指定线程(例如本例就不适合在该方法中显示对话框,因为是在 computation 中 subscribe 的)
Logger.d("订阅成功 onSubscribe");
}
@Override
public void onNext(String item) { // main【受 C 影响】
Logger.d("onNext: " + item);
}
@Override
public void onComplete() { // main【受 C 影响】
Logger.d("接收完所有数据");
}
@Override
public void onError(Throwable throwable) { // main【受 C 影响】
Logger.d(throwable);
}
});
});
}
-
Observable
- 被观察者并不是在创建的时候就立即开始发送事件,而是在它被订阅的时候
- 在一个正确运行的事件序列中, onComplete() 和 onError() 有且只有一个,并且是事件序列中的最后一个
- Observer 的 onSubscribe 方法
- 在 subscribe 刚开始,但事件还未发送之前被调用,可以用于做一些准备工作
- 该方法总是在 subscribe 所发生的线程被调用,而不能指定线程
- 如果对准备工作的线程有要求,可能该方法就不适用做准备工作,例如在 io 线程执行 subscribe 时,就不能在 onSubscribe 中显示 Dialog
- Observable.subscribe -> Observable.subscribeActual -> Observer.onSubscribe -> ObservableOnSubscribe.subscribe
-
observeOn
- 如果后续没有 observeOn,则影响后续「操作符」和 Observer 的「onNext」、「onComplete」、「onError」方法的执行线程
- 如果后续还有 observeOn,则影响两个 observeOn 之间操作符的执行线程
- flatMap 内部执行 observeOn 来切换线程也会外部的操作符执行线程,但不会影响外部的 doOnComplete
-
subscribeOn
- 第一次执行,影响「被观察者」以及「被观察者和第一个 observeOn」之间操作符的执行线程(包括 doOnSubscribe)
- 第二次执行,只影响两个 subscribeOn 之间的 doOnSubscribe
- 对 flatMap、switchMap 内部新创建的 Observable 执行 subscribeOn 会影响外部后续操作符的执行线程
-
doOnSubscribe
- 在 subscribeOn 之前调用时,受 subscribeOn 影响
- 在 subscribeOn 之后调用时,与 Observer 的 onSubscribe 方法一样,总是在 subscribe 所发生的线程被调用,而不能指定线程
-
map
- 对上游发送的每一个事件应用一个函数, 使得每一个事件都按照指定的函数去变化
-
flatMap
- 将上游 Observable 发送的数据集合变换为 Observable 集合,然后将这些 Observable 发射的数据平坦的放进一个单独的 Observable
- flatMap 并不保证事件的顺序,如果需要保证顺序则需要使用 concatMap
优化搜索
private PublishSubject<String> mKeywordPs;
private void optimizeSearch() {
mKeywordPs = PublishSubject.create();
mKeywordSv.setOnQueryTextListener(new SearchView.OnQueryTextListener() {
@Override
public boolean onQueryTextSubmit(String query) {
return false;
}
@Override
public boolean onQueryTextChange(String keyword) {
mKeywordPs.onNext(keyword);
return true;
}
});
mKeywordPs.debounce(400, TimeUnit.MILLISECONDS) // debounce 默认是在 computation 线程的。发送一个延时消息给下游,如果在这段延时时间内没有收到新的请求,那么下游就会收到该消息;而如果在这段延时时间内收到来新的请求,那么就会取消之前的消息,并重新发送一个新的延时消息
.observeOn(AndroidSchedulers.mainThread()) // 这里手动将后续操作符切换到主线程,否则 filter 也是在 computation 线程的
.filter(keyword -> { // 只有返回 true 时,才会将事件发送给下游,否则就丢弃该事件
if (StringUtil.isNotEmpty(keyword)) {
return true;
} else {
mResultTv.setText("清空了关键字");
return false;
}
})
.switchMap(keyword -> getSearchObservable(keyword)) // 将上游 Observable 发送的数据集合变换为 Observable 集合,然后只发射这些 Observable 最近发射的数据「在该节点收到一个新的事件之后,如果之前收到的事件所产生的 Observable A 还没有发送事件给下游,那么下游就再也不会收到 Observable A」
.observeOn(AndroidSchedulers.mainThread())
.compose(bindToLifecycle())
.doOnError(throwable -> {
mResultTv.setText("错误:" + throwable.getMessage()); // 将 Observer 的 onError 中的错误处理放到 doOnError 中处理
})
.retryWhen(throwableObservable -> throwableObservable) // 处理 onError 时重订阅,避免发生一次错误后就再也搜索不到结果。Observer 的 onError 将不会再被回调。如果不做延时操作,可以直接用 retry 操作符
.filter(result -> StringUtil.isNotEmpty(mKeywordSv.getQuery())) // 避免返回结果时,如果当前搜索框关键字为空则忽略此次搜索结果
.subscribe(result -> {
Logger.d("onNext:" + result);
mResultTv.setText(result);
});
}
private Observable<String> getSearchObservable(final String keyword) {
return Observable.create((ObservableOnSubscribe<String>) emitter -> {
try {
if (StringUtil.isEqual("q", keyword)) { // 搜索 q 时延时 3 秒返回结果
Thread.sleep(3000);
} else if (StringUtil.isEqual("e", keyword)) { // 搜索 q 时延时 3 秒返回网络异常
Thread.sleep(3000);
emitter.onError(new Exception("网络异常"));
} else { // 搜索其他关键字时延时 1 秒返回结果
Thread.sleep(1000);
}
if (!emitter.isDisposed()) {
emitter.onNext("关键词为:" + keyword);
emitter.onComplete();
}
} catch (Exception e) {
if (!emitter.isDisposed()) {
emitter.onError(e);
}
}
}).subscribeOn(Schedulers.io());
}
-
debounce
- 默认是在 computation 线程的,可以指定线程
- 发送一个延时消息给下游,如果在这段延时时间内没有收到新的请求,那么下游就会收到该消息;而如果在这段延时时间内收到来新的请求,那么就会取消之前的消息,并重新发送一个新的延时消息
-
filter
- 只有返回 true 时,才会将事件发送给下游,否则就丢弃该事件
-
switchMap
- 将上游 Observable 发送的数据集合变换为 Observable 集合,然后只发射这些 Observable 最近发射的数据「在该节点收到一个新的事件之后,如果之前收到的事件所产生的 Observable A 还没有发送事件给下游,那么下游就再也不会收到 Observable A」
-
retryWhen
- 上游通知 retryWhen 本次订阅流已经完成,询问其是否需要重订阅,该询问是以 onError 事件触发的
- 外层 retryWhen 时重头再来重试,flatMap、concatMap、switchMap 内部 retryWhen 时只有其内部创建的那个 Observable 重试
- Function 的输入是一个 Observable<Throwable>,我们可以结合 flatMap 根据上游发送的错误类型进行相应的处理
- 返回一个 ObservableSource>,如果该 ObservableSource> 返回 onComplete 或 onError,那么不会触发重订阅;如果发送 onNext,那么会触发重订阅,也就是说,它仅仅是作为一个是否要触发重订阅的通知,onNext 发送的是什么数据并不重要
吸顶搜索
public class Engine {
public static final String BASE_URL = "http://192.168.31.152:8080/";
private RxJavaApi mRxJavaApi;
private Engine() {
boolean isBuildDebug = AppManager.getInstance().isBuildDebug();
HttpLoggingInterceptor.Level logLevel = isBuildDebug ? HttpLoggingInterceptor.Level.BODY : HttpLoggingInterceptor.Level.NONE;
OkHttpClient client = new OkHttpClient().newBuilder()
.addInterceptor(new HttpLoggingInterceptor().setLevel(logLevel))
.addInterceptor(new HeaderInterceptor())
.connectTimeout(10000, TimeUnit.MILLISECONDS)
.readTimeout(10000, TimeUnit.MILLISECONDS)
.writeTimeout(10000, TimeUnit.MILLISECONDS)
.build();
Retrofit retrofit = new Retrofit.Builder()
.baseUrl(BASE_URL)
.addCallAdapterFactory(RxJava2CallAdapterFactory.createWithScheduler(Schedulers.io())) // 指定在 io 线程进行网络请求
.addConverterFactory(GsonConverterFactory.create())
.client(client)
.build();
mRxJavaApi = retrofit.create(RxJavaApi.class);
}
private static class SingletonHolder {
private static final Engine INSTANCE = new Engine();
}
public static Engine getInstance() {
return Engine.SingletonHolder.INSTANCE;
}
public static RxJavaApi getRxJavaApi() {
return getInstance().mRxJavaApi;
}
}
public interface RxJavaApi {
// 查询博客列表
@GET("api/blogs")
Observable<NetResult<List<Blog>>> findBlogList(@Query("keyword") String keyword);
// 查询分类列表
@GET("api/categorys")
Observable<NetResult<List<Category>>> getCategoryList();
}
mKeywordSv.setOnQueryTextListener(new SearchView.OnQueryTextListener() {
@Override
public boolean onQueryTextSubmit(String query) {
return false;
}
@Override
public boolean onQueryTextChange(String keyword) {
mKeywordPs.onNext(keyword);
return true;
}
});
private PublishSubject<String> mKeywordPs;
private void initSearch() {
mKeywordPs = PublishSubject.create();
mKeywordPs.debounce(400, TimeUnit.MILLISECONDS) // debounce 默认是在 computation 线程的。发送一个延时消息给下游,如果在这段延时时间内没有收到新的请求,那么下游就会收到该消息;而如果在这段延时时间内收到来新的请求,那么就会取消之前的消息,并重新发送一个新的延时消息
.observeOn(AndroidSchedulers.mainThread()) // 这里手动将后续操作符切换到主线程,否则 filter 也是在 computation 线程的
.filter(keyword -> { // 只有返回 true 时,才会将事件发送给下游,否则就丢弃该事件
if (StringUtil.isNotEmpty(keyword)) {
return true;
} else {
mBlogAdapter.clear();
return false;
}
})
.switchMap(keyword -> { // 将上游 Observable 发送的数据集合变换为 Observable 集合,然后只发射这些 Observable 最近发射的数据「在该节点收到一个新的事件之后,如果之前收到的事件所产生的 Observable A 还没有发送事件给下游,那么下游就再也不会收到 Observable A」
/**
* combineLatest 和 zip 类似,都是组合两个 Observable 的数据为新的 Observable
* zip 当原始 Observable 中每一个都发射了一条数据时才发射数据
* combineLatest 当原始 Observable 中任何一个发射了一条数据时发射数据
*/
return Observable.combineLatest(
getCategoryListObservable(),
mBlogApi.findBlogList(keyword).flatMap(netResult -> Observable.just(netResult.data)),
(blogCategoryList, blogList) -> {
convertToCategoryArray(blogCategoryList);
return blogList;
}
);
})
.observeOn(AndroidSchedulers.mainThread())
.compose(bindToLifecycle())
.doOnError(throwable -> {
throwable.printStackTrace();
Logger.d("错误:" + throwable.getMessage()); // 将 Observer 的 onError 中的错误处理放到 doOnError 中处理
})
.retryWhen(throwableObservable -> throwableObservable) // 处理 onError 时重订阅,避免发生一次错误后就再也搜索不到结果。Observer 的 onError 将不会再被回调
.filter(result -> StringUtil.isNotEmpty(mKeywordSv.getQuery())) // 避免返回结果时,如果当前搜索框关键字为空则忽略此次搜索结果
.subscribe(result -> {
Logger.d("查询成功");
mBlogAdapter.setData(result);
});
}
// 获取发送分类列表的被观察者
private Observable<List<Category>> getCategoryListObservable() {
// switchIfEmpty 如果原始 Observable 正常终止后仍然没有发射任何数据,就使用备用的 Observable
return getCategoryListFromCache()
.switchIfEmpty(mBlogApi.getCategoryList().flatMap(netResult -> Observable.just(netResult.data)))
.doOnNext(categoryList -> convertToCategoryArray(categoryList));
// concat 操作符是接收若干个 Observables,发射数据是有序的,不会交叉。concat + takeUntil 实现二级缓存
// return Observable.concat(
// getCategoryListFromCache(),
// mBlogApi.getCategoryList().flatMap(netResult -> Observable.just(netResult.data))
// ).takeUntil(categoryList -> categoryList != null)
// .doOnNext(categoryList -> convertToCategoryArray(categoryList));
}
// 从缓存中获取分类列表
private Observable<List<Category>> getCategoryListFromCache() {
// defer 只有当订阅者订阅时才创建 Observable,为每个订阅者创建一个新的 Observable。内部通过 ObservableDefer 在订阅时调用 Callable 的 call 方法创建 Observable
return Observable.defer(() -> {
Logger.d("defer");
return CollectionUtil.isEmpty(mCategoryList) ? Observable.empty() : Observable.just(mCategoryList);
});
}
-
defer
- 只有当订阅者订阅时才创建 Observable,为每个订阅者创建一个新的 Observable
- 内部通过 ObservableDefer 在订阅时调用 Callable 的 call 方法创建 Observable
-
switchIfEmpty
- 如果原始 Observable 正常终止后仍然没有发射任何数据,就使用备用的 Observable
- 可用于实现二级缓存,两个 switchIfEmpty 结合着使用也可以实现三级缓存
-
concat
- 接收若干个 Observables,发射数据是有序的
- 只有需要数据的时候才会订阅所有的 Observable 数据源
- 只接受相同泛型的参数
-
takeUntil
- 满足某个条件时停止检索队列「与 concat 结合使用可实现二级、三级缓存」
- 值得注意的是,输出结果是包含当前满足条件的这个元素
- rxlifecycle 内部也是该操作符来绑定生命周期的
-
combineLatest
- 和 zip 类似,接受多个 Observable 以及一个函数作为参数,并且函数的签名为这些 Observable 发射的数据类型
- combineLatest 任意一个 Observable 发射数据之后,会去取其它 Observable 最近一次发射的数据,回调到函数当中,但是该函数回调的前提是所有的 Observable 都至少发射过一个数据项
- zip 当原始 Observable 中每一个都发射了一条数据时才发射数据
表单+Token图片上传【token 过期时自动获取新的 token】
public interface RxJavaApi {
// 添加博客
@POST("api/blogs")
Observable<NetResult<Blog>> addBlog(@Body Blog blog);
// 获取文件上传 token
@GET("api/file/token")
Observable<NetResult<UploadToken>> getUploadToken();
// 上传文件
@POST("api/file/upload")
Observable<NetResult<String>> upload(@Body RequestBody requestBody);
}
private String mCoverFilePath = "/sdcard/avatar.png";
private void addBlog() {
final Blog blog = new Blog();
blog.setCategoryId(1L);
blog.setTitle("Token + 图片上传 + 错误重试");
blog.setContent("我是内容");
Observable.defer(() -> UploadManager.getInstance().getUploadObservable(mCoverFilePath))
.switchMap(filePath -> {
mCoverFilePath = filePath;
blog.setCover(mCoverFilePath);
return Engine.getRxJavaApi().addBlog(blog);
}).compose(RxUtil.handleResultThreadLifecycleRetry(this))
.subscribe(result -> Logger.d("添加博客成功"), throwable -> Logger.d("添加博客失败"));
}
public class UploadManager {
private AtomicBoolean mRefreshing = new AtomicBoolean(false);
private static final String SP_KEY_TOKEN = "SP_KEY_TOKEN";
private static final String SP_KEY_EXPIRE_TIME = "SP_KEY_EXPIRE_TIME";
private UploadManager() {
}
private static class SingletonHolder {
private static final UploadManager INSTANCE = new UploadManager();
}
public static UploadManager getInstance() {
return UploadManager.SingletonHolder.INSTANCE;
}
public static String getToken() {
return SPUtil.getString(SP_KEY_TOKEN);
}
public static long getExpireTime() {
return SPUtil.getLong(SP_KEY_EXPIRE_TIME);
}
// 获取文件上传 Token 的 Observable
public Observable getImageTokenObservable() {
if (mRefreshing.compareAndSet(false, true)) {
Logger.d("没有请求,发起一次新的 Token 请求");
return Engine.getRxJavaApi()
.getUploadToken()
.compose(RxUtil.handleResult())
.doOnNext(uploadToken -> {
SPUtil.putString(SP_KEY_TOKEN, uploadToken.getToken());
SPUtil.putLong(SP_KEY_EXPIRE_TIME, uploadToken.getExpireTime());
mRefreshing.set(false);
})
.doOnError(throwable -> mRefreshing.set(false));
} else {
Logger.d("已经有 Token 请求,延迟 5 秒重试");
return Observable.timer(5000, TimeUnit.MILLISECONDS);
}
}
// 获取文件上传的 Observable
public Observable<String> getUploadObservable(String filePath) {
if (filePath != null && !filePath.startsWith("http://") && !filePath.startsWith("https://")) {
File file = new File(filePath);
RequestBody body = new MultipartBody.Builder().setType(MultipartBody.FORM)
.addFormDataPart("file", file.getName(), RequestBody.create(MediaType.parse("image/*"), file))
.build();
return Engine.getRxJavaApi().upload(body)
.compose(RxUtil.handleResult())
.map(fileName -> Engine.BASE_URL + "api/file/browse/" + fileName);
} else {
return Observable.just(filePath == null ? "" : filePath);
}
}
}
public class RxUtil {
private RxUtil() {
}
// 处理结果
public static <T> ObservableTransformer<NetResult<T>, T> handleResult() {
return observable -> observable.flatMap(result -> {
if (result.code == 0) {
return Observable.just(result.data);
} else {
return Observable.error(new ApiException(result.msg, result.code));
}
});
}
// 处理结果、主线程、生命周期绑定
public static <T> ObservableTransformer<NetResult<T>, T> handleResultMainThreadLifecycle(LifecycleProvider lifecycleProvider) {
return observable -> observable.compose(handleResult())
.observeOn(AndroidSchedulers.mainThread())
.compose(lifecycleProvider.bindToLifecycle());
}
// 处理结果、主线程、生命周期绑定、错误重试
public static <T> ObservableTransformer<NetResult<T>, T> handleResultThreadLifecycleRetry(LifecycleProvider lifecycleProvider) {
return observable -> observable.compose(handleResult())
.observeOn(AndroidSchedulers.mainThread())
.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
private int mRetryCount;
@Override
public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
return throwableObservable.flatMap(throwable -> {
if (throwable instanceof IOException || throwable instanceof SocketException) { // 网络异常重试3次
mRetryCount++;
if (mRetryCount > 3) {
Logger.d("错误超过3次");
return Observable.error(throwable);
} else {
Logger.d("错误" + mRetryCount + "次");
return Observable.timer(mRetryCount * 1000, TimeUnit.MILLISECONDS);
}
} else if (throwable instanceof ApiException) {
if (((ApiException) throwable).getCode() == 401) {
return UploadManager.getInstance().getImageTokenObservable();
}
return Observable.error(throwable);
} else { // 未知异常直接返回发送 error 的 Observable
Logger.d("未知异常");
throwable.printStackTrace();
return Observable.error(throwable);
}
});
}
})
.compose(lifecycleProvider.bindToLifecycle());
}
}
- compose
- 操作符组合的复用利器
- 避免打断链式结构
- ObservableTransformer
- 实际上就是一个 Function<Observable<T>, Observable<R>>
- 可以通过它将一种类型的 Observable 转换成另一种类型的 Observable,和调用一系列的内联操作符一样
public static <T> ObservableTransformer<NetResult<T>, T> handleResultOrigin() {
return new ObservableTransformer<NetResult<T>, T>() {
@Override
public Observable<T> apply(Observable<NetResult<T>> observable) {
return observable.flatMap(new Function<NetResult<T>, Observable<T>>() {
@Override
public Observable<T> apply(NetResult<T> result) throws Exception {
if (result.code == 0) {
return Observable.just(result.data);
} else {
return Observable.error(new ApiException(result.msg, result.code));
}
}
});
}
};
}
自定义操作符
private void customOperator() {
Engine.getRxJavaApi().getUploadToken()
.delay(2000, TimeUnit.MILLISECONDS)
.lift(new NetResultOperator())
.doOnSubscribe(disposable -> showLoadingDialog("正在获取Token..."))
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(uploadToken -> {
dismissLoadingDialog();
Logger.d("获取成功:" + GsonUtil.toJson(uploadToken));
}, throwable -> {
dismissLoadingDialog();
Logger.d("获取文件上传 Token 失败" + throwable.getMessage());
});
}
private class NetResultOperator implements ObservableOperator<UploadToken, NetResult<UploadToken>> {
@Override
public Observer<? super NetResult<UploadToken>> apply(Observer<? super UploadToken> observer) throws Exception {
return new Observer<NetResult<UploadToken>>() {
private Disposable mDisposable;
@Override
public void onSubscribe(Disposable disposable) {
mDisposable = disposable;
observer.onSubscribe(mDisposable);
}
@Override
public void onNext(NetResult<UploadToken> netResult) {
if (!mDisposable.isDisposed()) {
observer.onNext(netResult.data);
}
}
@Override
public void onError(Throwable e) {
if (!mDisposable.isDisposed()) {
observer.onError(e);
}
}
@Override
public void onComplete() {
if (!mDisposable.isDisposed()) {
observer.onComplete();
}
}
};
}
}
- ObservableOperator
- 自定义 Operator 在发射任何数据之前都要使用 !mDisposable.isDisposed() 来检查 Observer 的状态,如果没有任何 Observer 订阅就没有必要去发射数据了
- lift
- 是对 Observer 进行操作的
- 可以将我们自定义的操作符和其它的操作符一起做链式调用
错误处理
// 初始化 RxJava 错误处理器
private void initRxJavaErrorHandler() {
RxJavaPlugins.setErrorHandler(e -> {
if (e instanceof UndeliverableException) {
e = e.getCause();
}
if ((e instanceof IOException) || (e instanceof SocketException)) { // 没事,无关紧要的网络问题或 API 在取消时抛出的异常
return;
}
if (e instanceof InterruptedException) { // 没事,一些阻塞代码被 dispose 调用中断
return;
}
if ((e instanceof NullPointerException) || (e instanceof IllegalArgumentException)) { // 这可能是程序的一个bug
Thread.currentThread().getUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), e);
return;
}
if (e instanceof IllegalStateException) { // 这是 RxJava 或自定义操作符的一个 bug
Thread.currentThread().getUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), e);
return;
}
Logger.w("Undeliverable exception received, not sure what to do");
e.printStackTrace();
});
}
- RxJava2 一个重要的设计需求就是不能吞下任何的 Throwable 错误。这里的错误是指那些由于下游流的生命周期走到了尽头或下游流取消了即将发射错误的序列
- 这些错误被发送到 RxJavaPlugins.onError 处理器。该处理器可以通过 RxJavaPlugins.setErrorHandler 方法重载。如果没有重载,缺省状态下 RxJava 打印 Throwable 的堆栈轨迹到控制台并且调用当前线程的未捕获异常处理器
- 如果要避免调用未捕获异常处理器,使用 RxJava2(直接地或间接地)的最终程序应该设置一个空的处理器「RxJavaPlugins.setErrorHandler(e -> {});」
- RxJava2 中如果通过 RxJavaPlugins.setErrorHandler 方法设置了错误处理器,那么在 subscribe 时不指定 onError 的 Consumer 应用也不会崩溃,除非是在错误处理器中手动调用了 「Thread.currentThread().getUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), e);」才会崩溃