Awesome-RxJava
Awesome-RxJava copied to clipboard
适用场景
我先来开个头~
create操作符适用于创建Observable对象,使用Retrofit可以直接生成Observable,但是对于不使用Retrofit的开发者咋办呢?下面的方法,可以在任意Http请求框架下生成Observable对象
异步请求可以这样写,PS:OkHttpProxy是我对OkHttp的一个简单封装,主要把回调用Handler弄到了UI线程,所以下面的代码不需要指定观察者所在线程
public static Observable<User> getUser() {
return Observable.create((Observable.OnSubscribe<User>) subscriber ->
OkHttpProxy.get().url(URL_USER).enqueue(new OkCallback<User>(new OkJsonParser<User>() {
}) {
@Override
public void onSuccess(int code, User user) {
subscriber.onNext(user);
subscriber.onCompleted();
}
@Override
public void onFailure(Throwable e) {
subscriber.onError(e);
}
}));
}
同步请求可以这样写,PS:由于是同步请求,所以需要指定subscribeOn()所在线程和observeOn所在线程,注意,这里的同步请求指的是网络请求会在发起请求的线程执行,所以我们需要指定Schedulers.io()开启工作线程
public static Observable<User> getUserSync() {
return Observable.create((Observable.OnSubscribe<User>) subscriber -> {
try {
subscriber.onNext(new OkJsonParser<User>() {
}.parseResponse(OkHttpProxy.get().url(URL_USER).execute()));
} catch (IOException e) {
subscriber.onError(e);
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
通过上面简单的封装,你就可以把现有的Http框架改造为RxJava适用的框架,从而可以体验RxJava带来的便捷
返回不同的Observable模板: 比如一个Observable<A>和一个Observable<B>
Observable.zip(mRxUserPresenter.mineInfo(), mRxUserPresenter.stat(), (m, s) -> {
handleUserInfo(m);
handleUserStats(s);
return Observable.empty();
})
.subscribeOn(Schedulers.io())
.doOnSubscribe(() -> mContentProgressBar.setVisibility(View.VISIBLE))
.doOnTerminate(() -> mContentProgressBar.setVisibility(View.GONE))
.subscribe(r -> {},
this::handleException);
我是这么用的,有没有谁有更好的方案
我也来。RxJava经过封装后可以替代EventBus,实现组件跟组件之间的通讯。
@ZhaoKaiQiang 后面的线程切换组合,感觉参考 dan lew的会更骚气。 链接在此:http://blog.danlew.net/2015/03/02/dont-break-the-chain/ #我不是来搅局的- -!
@lincleejun 谢谢!!!!我要的东西!!!
mark
dont break the chain是不错的文章。 不打断RxJava的链式调用,主要就是使用两个 operator, compose和lift
结合Retrofit简直不能更好用
求指导 是否可以通用
Observable.create(new Observable.OnSubscribe<Drawable>() {
@Override
public void call(Subscriber<? super Drawable> subscriber) {
Logger.d("call 当前所在子线程=====");
insertData();//插入数据库的耗时操作,或者一些耗时操作
subscriber.onNext(null);
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.io())
.doOnSubscribe(new Action0() {
@Override
public void call() {
dialog.show();//启动前先调用
Logger.d("doOnSubscribe 当前所在线程=====");
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Drawable>() {
@Override
public void onCompleted() {
Logger.d("onCompleted 当前所在线程=====");//任务执行完成
Intent intent = new Intent(this, MainActivity.class);//插入完成 跳转页面
startActivity(intent);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Drawable drawable) {
Logger.d("onNext 当前所在线程=====");
dialog.dismiss();
}
});
@jackTang11 我今天早上踏过的坑, 你在调用subscribe()
的时候,千万要注意自己是在什么线程里,因为它和你的doOnSubscribe
是在同一个线程执行的,也就是说,如果你subscribe()
在非主线程,那么在doOnSubscribe
做UI的操作就会crash...
@geminiwen 我看了 如果放在子线程 确实不行,还好我没有放在子线程
@ZhaoKaiQiang @lzyzsd @lincleejun @androidMVP 各位老大 帮忙看看 我写的代码是否有问题 ,以后我做耗时操作(比如,查询数据库,遍历对比,发送socket)就这么做,可以吗
@jackTang11 我觉的你这样的写法是有问题的 建议认真阅读 给 Android 开发者的 RxJava 详解 或者本工程的作者博客**大头鬼Bruce**关于RxJava的介绍以及其线程切换和Schedulers使用的相关内容吧,就目前你的代码中,最起码最后一个.subscribe()方法你所处理的事情应该切回(.observeOn(AndroidSchedulers.mainThread)
)UI线程去做吧 :smile: I‘m a Rx Newer
ps.使用方法的问题是不是建议另外open a new issue?
抱歉,来晚了。
@ZhaoKaiQiang 开了个好头,而且OkHttpProxy用起来酷极了。
RxJava提供了丰富的API用来创建Observable
,当然最灵活的还是使用Observable.create( )
方式来自定义发送事件。虽然使用方式灵活多变,但我们最好还是遵循Observable条约和Rx设计规范
,实现Observer
接口的实现类,允许使用.onNext( )
发送任意数量的事件,并且可选择性的添加.onCompleted( )
或.onError( )
操作符,但只能添加二者之一,这样表明Observable事件序列已经结束了,保证Observable事件序列的消费者,能够安全的执行清理操作。当然,一般情况下传入.subscribe( )
中的subscriber
会被封装成SafeSubscriber
,其中成员变量done
能够确保调用onCompleted( )
或.onError( )
后Subscriber
不再接受发送自.onNext( )
的事件,然后自己自动解除订阅关系。另外需要补充的是SafeSubscriber
并不能保证.onNext( )
发送事件的同步执行,.unsubscribe( )
的调用,也不能保证链上的操作符立即停止发送事件。
以下是我给出的基本使用方式,如有建议请指出。
Observable.create(new Observable.OnSubscribe<Object>() {
@Override public void call(Subscriber<? super Object> subscriber) {
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(new Object());
/*如果不需要复用Observable,可以添加完成通知,但是对于一个SafeSubscriber来说,会触发unsubscribe*/
subscriber.onCompleted();
}
subscriber.add(new Subscription() {
/*需要做同步处理,保证线程安全,建议使用AtomicIntegerFieldUpdater*/
public volatile boolean isUnsubscribed;
@Override public void unsubscribe() {
isUnsubscribed = true;
/*解除订阅时触发,处理取消网络请求,注销监听事件,取消FutureTask异步回调,释放内存等操作*/
/*do some clear*/
}
@Override public boolean isUnsubscribed() {
return isUnsubscribed;
}
});
}
}).subscribe(new Subscriber<Object>() {
@Override public void onCompleted() {
}
@Override public void onError(Throwable e) {
}
@Override public void onNext(Object o) {
}
});
当然这种使用方式并不是万能的,大多数情况下还要根据具体业务逻辑以及线程要求对其进行改造。
subscriber.add(new Subscription() {
/*需要做同步处理,保证线程安全,建议使用AtomicIntegerFieldUpdater*/
public volatile boolean isUnsubscribed;
@Override public void unsubscribe() {
isUnsubscribed = true;
/*解除订阅时触发,处理取消网络请求,注销监听事件,取消FutureTask异步回调,释放内存等操作*/
/*do some clear*/
}
@Override public boolean isUnsubscribed() {
return isUnsubscribed;
}
});
这个Subscription如果两个线程同时调用unsubscribe,可能导致clear的工作执行了两遍。这段可以用Subscriptions.create
来简化一下,不需要自己处理isUnsubscribed。
尽量不要自己用lift
去实现新的操作。如果真的需要,建议先看一下Dávid Karnok写的Pitfalls of operator implementations系列(需翻墙),不然很难写出没有bug的操作来,特别是一些并发的bug,自己写的测试不一定能触发。
@zsxwing 像MainThreadSubscription 那样把isUnsubscribed做到同步,应该就能避免重复clear操作了吧。而且我看到BooleanSubscription也是这样做的,只不过没有限定Android主线程。
MainThreadSubscription
这个类之前一直存在于RxBinding中,不过最近被转移到了RxAndroid中。
Subscriptions.create
其实就是创建了个BooleanSubscription
@zsxwing 恩恩,get了。
@lincleejun 那篇文章非常好,谢谢!改良之后,可以这样写咯
public class JDAPI {
public Observable<ArrayList<FreshNews>> getFreshNews(final int page) {
return Observable.create(new Observable.OnSubscribe<ArrayList<FreshNews>>() {
@Override
public void call(Subscriber<? super ArrayList<FreshNews>> subscriber) {
try {
String url = FreshNews.getUrlFreshNews(page);
subscriber.onNext(new FreshNewsParser().parse(OkHttpProxy.get()
.url(url)
.execute()));
subscriber.onCompleted();
} catch (IOException e) {
subscriber.onError(e);
}
}
}).compose(this.<ArrayList<FreshNews>>applySchedulers());
}
private <T> Observable.Transformer<T, T> applySchedulers() {
return new Observable.Transformer<T, T>() {
@Override
public Observable<T> call(Observable<T> observable) {
return observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
};
}
}
我目前用到的一个场景, RecyclerView
和 Adapter
:
Fragment
作为 subscriber
, Adapter
作为 Observable
其中 Fragment
里的代码:
public static Action1 mAction1 = new Action1<Integer>() {
@Override
public void call(Integer integer) {
if(integer==-1) {
ToastUtils.showShort("点击了全部");
} else {
ToastUtils.showShort("点击了 " + integer);
}
}
};
通过实例化 Adapter
将 mAction1
传入, 在 Adapter
中:
holder.btn.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
mObservable.just(-1).subscribe(Fragment.mAction1)
}
});
各位觉得这样做怎么样
@DaidongStudio
这么用当然无可后非。
ReactiveX 首页明晃晃的写了一句话 An API for asynchronous programming with observable streams
RxJava 介绍是这样 a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
概括一下关键词: 异步、事件驱动、可观察、流(序列和流可以理解成一种)。
好了,再来看你的运用场景,明显是主线程中两个组件进行通讯。
我更倾向于使用 throttleFirst(long windowDuration, TimeUnit unit)
实现事件过滤,假如一秒内用户连续点击三次 item 只回调事件处理者一次。
另外想说的是:使用一种解决方案总有原因,例如代码更简洁、易读和维护。或者程序性能更高、开销更小。这里推荐 Framework Fireside Chat 15:59 秒关于 Enum 的讨论。
一句话:我们要明白写下每一行代码的开销和带来的好处,斟酌之后再做决定。
以上仅个人粗浅理解,欢迎指正。
@johnwatsondev 你说的很对,RxJava就是通过组合那些简直不能更神奇的操作符来处理基于事件驱动的异步/同步数据流的。不过关于RxAndroid,很多人存在误解,它是为了Android而生的,但并不是Android下的Rx,我们使用的依然是RxJava,这是JakeWharton对RxAndroid的描述:
Android specific bindings for RxJava. This module adds the minimum classes to RxJava that make writing reactive components in Android applications easy and hassle-free. More specifically, it provides a Scheduler that schedules on the main UI thread or any given Handler.
他为Android开发提供了RxAndroidPlugins
,封装了适合Android主线程的HandlerScheduler
,并且开放了标记为@Beta
的.reset()
方法,这无疑为我们进行测试带来了很大的方便。关于RxJava为什么一直没有公开这个方法,我想在这个issue中benjchristensen给出了非常非常非常好的解释Make RxJavaPlugins.reset() public
最近人们经常提到如何解决这种快速点击事件带来的Backpressure,很多人提倡使用.throttleFirst(long windowDuration, TimeUnit unit)
,这无疑是一种很不错的解决方式,但同时也意味着我们要丢弃某些事件,甚至很难准确的给出windowDuration
,不过我们本不应该如此小心翼翼的使用操作符,应为这会拖慢我们的开发效率,当然关于学习和使用RxJava,还有很长的一段路要走。另外需要一提的是,根据@DaidongStudio给出的代码片段,这种static的方式,我很难下定论: (
@SmartDengg Sorry, 关于事件过滤那里当时没有仔细考证,已修正。谢谢指出!
另外关于同步的用法,扔物线解释的很到位,更多的为了扩展性。比如使用 RxBinding 把按钮点击事件转化为可观察事件,再进行去抖操作,或者用 flatMap
做逻辑处理。
个人对同步的用法持谨慎态度,和 事件回调
处理事件通讯相比,是否代码更易维护,资源开销更小,内存泄漏更易避免?
@SmartDengg , @johnwatsondev , 谢谢两位的讲解 .
关于说的 static
这个方法,主要是我在RxAndroid
所提供的 sample 中看到使用了static
, 但其实我也很难确定这样做是否是最优 - -! , 我目前也是尽量的尝试使用Rx 去实现某些功能, 所以考虑的还很欠缺。
个人对同步的用法持谨慎态度,和 事件回调 处理事件通讯相比,是否代码更易维护,资源开销更小,内存泄漏更易避免?
对于 @johnwatsondev 所提到的这一点我也是持有不确定的态度。
哪里聊到内存泄露了?内存泄露具体问题拿上代码我们可以来聊聊 推一下我这个内存泄露例子 https://github.com/BaronZ/MemoryLeaksDemo
@johnwatsondev 很抱歉,我不是很理解您所说的同步用法,能否举个例子,thanks :)
@SmartDengg 实在抱歉,可能我表述有问题。
异步
和 事件驱动
是响应式编程的两个特点。
上文中 同步
是指 @DaidongStudio 所贴代码的用法,观察者和订阅者在同一线程。是针对 异步
讲的,讲的太草率,应该叫 非异步
。实在不好意思。
本质是 事件驱动
的场景都可以用响应式编程,因为可以抽象成数据流。不过我认为响应式编程在 异步
+ 并发
方面才真正体现了它的核心威力。
以上仅个人看法,难免有误,欢迎指正。
@johnwatsondev 请指教,愿学习 哪里写的不好,你说出来,我也好学习学习,或者你有没有内存泄露的例子再补充的也可以。 佩服我这话就不要说了 @johnwatsondev 别把话删除了啊,我一直在刷新等你回复,如果有问题,真心准备学习,你这删除了就不好玩了-- 真心想学习而已。看你这话,肯定是觉得我的代码写得很有问题,强烈希望你到我的demo下开issue,看我出现了哪些低级问题,我们来交流 附上你原话: “@BaronZ 这样打广告真的合适吗?大概看了下那个 Demo ,我很佩服足下开源那个代码的勇气。”
@johnwatsondev 为不影响这个issue,我开了一个讨论我代码问题的issue,同时也把我评论合并为一条,地址如下,欢迎来讨论 https://github.com/BaronZ/MemoryLeaksDemo/issues/1
@johnwatsondev 嗯哼,你说的没错,消息驱动的确是响应式编程的必备特点。不够我还是想补充一下,响应式编程的最大威力除了Responsive(响应式)和Message-driven(消息驱动)的外,还具备以下特点:
- Resilient(弹性的):当系统中的部分组件或者数据出错时,仍能做出响应。不得不提的是RxJava对Error的传递机制简直太棒了,这不仅让我避免了回调地狱,而且也让我对系统进行容错处理变得更容易,因为我能够对这些error做出即时响应。
- Scalable(伸缩性):就如@zsxwing老师说的那样Backpressure Mechanisms的加入让RxJava的代码难度增加了好几个等级,但是多亏了这个,才让我们在解决Overload与Load Shedding等问题时轻松了许多。