rxjava sample操作符使用from操作符之后怎么每次发射数据时延迟一段时间

深入浅出RxJava(二:操作符) -
深入浅出RxJava(二:操作符)
在第一篇blog中,我介绍了RxJava的一些基础知识,同时也介绍了map()操作符。当然如果你并没有意愿去使用RxJava我一点都不诧异,毕竟才接触了这么点。看完这篇blog,我相信你肯定想立即在你的项目中使用RxJava了,这篇blog将介绍许多RxJava中的操作符,RxJava的强大性就来自于它所定义的操作符。
首先先看一个例子:
假设我有这样一个方法:
这个方法根据输入的字符串返回一个网站的url列表(,搜索引擎)
[java]&view
Observable&List&String&&&query(String&text);&&&
现在我希望构建一个健壮系统,它可以查询字符串并且显示结果。根据上一篇blog的内容,我们可能会写出下面的代码:
[java]&view
query(&Hello,&world!&)&&
&&&&.subscribe(urls&-&&{&&
&&&&&&&&for&(String&url&:&urls)&{&&
&&&&&&&&&&&&System.out.println(url);&&
&&&&&&&&}&&
这种代码当然是不能容忍的,因为上面的代码使我们丧失了变化数据流的能力。一旦我们想要更改每一个URL,只能在Subscriber中来做。我们竟然没有使用如此酷的map()操作符!!!
当然,我可以使用map操作符,map的输入是urls列表,处理的时候还是要for each遍历,一样很蛋疼。
万幸,还有Observable.from()方法,它接收一个集合作为输入,然后每次输出一个元素给subscriber:
[java]&view
Observable.from(&url1&,&&url2&,&&url3&)&&
&&&&.subscribe(url&-&&System.out.println(url));&&
我们来把这个方法使用到刚才的场景:
[java]&view
query(&Hello,&world!&)&&
&&&&.subscribe(urls&-&&{&&
&&&&&&&&Observable.from(urls)&&
&&&&&&&&&&&&.subscribe(url&-&&System.out.println(url));&&
虽然去掉了for each循环,但是代码依然看起来很乱。多个嵌套的subscription不仅看起来很丑,难以修改,更严重的是它会破坏某些我们现在还没有讲到的RxJava的特性。
救星来了,他就是flatMap()。
Observable.flatMap()接收一个Observable的输出作为输入,同时输出另外一个Observable。直接看代码:
[java]&view
query(&Hello,&world!&)&&
&&&&.flatMap(new&Func1&List&String&,&Observable&String&&()&{&&
&&&&&&&&@Override&&
&&&&&&&&public&Observable&String&&call(List&String&&urls)&{&&
&&&&&&&&&&&&return&Observable.from(urls);&&
&&&&&&&&}&&
&&&&.subscribe(url&-&&System.out.println(url));&&
这里我贴出了整个的函数代码,以方便你了解发生了什么,使用lambda可以大大简化代码长度:
[java]&view
query(&Hello,&world!&)&&
&&&&.flatMap(urls&-&&Observable.from(urls))&&
&&&&.subscribe(url&-&&System.out.println(url));&&
flatMap()是不是看起来很奇怪?为什么它要返回另外一个Observable呢?理解flatMap的关键点在于,flatMap输出的新的Observable正是我们在Subscriber想要接收的。现在Subscriber不再收到List&String&,而是收到一些列单个的字符串,就像Observable.from()的输出一样。
这部分也是我当初学RxJava的时候最难理解的部分,一旦我突然领悟了,RxJava的很多疑问也就一并解决了。
还可以更好
flatMap()实在不能更赞了,它可以返回任何它想返回的Observable对象。
比如下面的方法:
[java]&view
//&返回网站的标题,如果404了就返回null&&
Observable&String&&getTitle(String&URL);&&
接着前面的例子,现在我不想打印URL了,而是要打印收到的每个网站的标题。问题来了,我的方法每次只能传入一个URL,并且返回值不是一个String,而是一个输出String的Observabl对象。使用flatMap()可以简单的解决这个问题。
[java]&view
query(&Hello,&world!&)&&
&&&&.flatMap(urls&-&&Observable.from(urls))&&
&&&&.flatMap(new&Func1&String,&Observable&String&&()&{&&
&&&&&&&&@Override&&
&&&&&&&&public&Observable&String&&call(String&url)&{&&
&&&&&&&&&&&&return&getTitle(url);&&
&&&&&&&&}&&
&&&&.subscribe(title&-&&System.out.println(title));&&
使用lambda:
[java]&view
query(&Hello,&world!&)&&
&&&&.flatMap(urls&-&&Observable.from(urls))&&
&&&&.flatMap(url&-&&getTitle(url))&&
&&&&.subscribe(title&-&&System.out.println(title));&&
是不是感觉很不可思议?我竟然能将多个独立的返回Observable对象的方法组合在一起!帅呆了!
不止这些,我还将两个API的调用组合到一个链式调用中了。我们可以将任意多个API调用链接起来。大家应该都应该知道同步所有的API调用,然后将所有API调用的回调结果组合成需要展示的数据是一件多么蛋疼的事情。这里我们成功的避免了callback hell(多层嵌套的回调,导致代码难以阅读维护)。现在所有的逻辑都包装成了这种简单的响应式调用。
丰富的操作符
目前为止,我们已经接触了两个操作符,RxJava中还有更多的操作符,那么我们如何使用其他的操作符来改进我们的代码呢?
getTitle()返回null如果url不存在。我们不想输出&null&,那么我们可以从返回的title列表中过滤掉null值!
[java]&view
query(&Hello,&world!&)&&
&&&&.flatMap(urls&-&&Observable.from(urls))&&
&&&&.flatMap(url&-&&getTitle(url))&&
&&&&.filter(title&-&&title&!=&null)&&
&&&&.subscribe(title&-&&System.out.println(title));&&
filter()输出和输入相同的元素,并且会过滤掉那些不满足检查条件的。
如果我们只想要最多5个结果:
[java]&view
query(&Hello,&world!&)&&
&&&&.flatMap(urls&-&&Observable.from(urls))&&
&&&&.flatMap(url&-&&getTitle(url))&&
&&&&.filter(title&-&&title&!=&null)&&
&&&&.take(5)&&
&&&&.subscribe(title&-&&System.out.println(title));&&
take()输出最多指定数量的结果。
如果我们想在打印之前,把每个标题保存到磁盘:
[java]&view
query(&Hello,&world!&)&&
&&&&.flatMap(urls&-&&Observable.from(urls))&&
&&&&.flatMap(url&-&&getTitle(url))&&
&&&&.filter(title&-&&title&!=&null)&&
&&&&.take(5)&&
&&&&.doOnNext(title&-&&saveTitle(title))&&
&&&&.subscribe(title&-&&System.out.println(title));&&
doOnNext()允许我们在每次输出一个元素之前做一些额外的事情,比如这里的保存标题。
看到这里操作数据流是多么简单了么。你可以添加任意多的操作,并且不会搞乱你的代码。
RxJava包含了大量的操作符。操作符的数量是有点吓人,但是很值得你去挨个看一下,这样你可以知道有哪些操作符可以使用。弄懂这些操作符可能会花一些时间,但是一旦弄懂了,你就完全掌握了RxJava的威力。
你甚至可以编写自定义的操作符!这篇blog不打算将自定义操作符,如果你想的话,清自行Google吧。
感觉如何?
好吧,你是一个怀疑主义者,并且还很难被说服,那为什么你要关心这些操作符呢?
因为操作符可以让你对数据流做任何操作。
将一系列的操作符链接起来就可以完成复杂的逻辑。代码被分解成一系列可以组合的片段。这就是响应式函数编程的魅力。用的越多,就会越多的改变你的编程思维。
另外,RxJava也使我们处理数据的方式变得更简单。在最后一个例子里,我们调用了两个API,对API返回的数据进行了处理,然后保存到磁盘。但是我们的Subscriber并不知道这些,它只是认为自己在接收一个Observable&String&对象。良好的封装性也带来了编码的便利!
在第三部分中,我将介绍RxJava的另外一些很酷的特性,比如错误处理和并发,这些特性并不会直接用来处理数据。
转自&http://blog.csdn.net/lzyzsd/article/details/
更多相关文章
Selenium中文参考手册
Selenium深入浅出之二
原文链接 RxJava正在Android开发者中变的越来越流行.唯一的问题就是上手不容易,尤其是大部分人之前都是使用命令式编程语言.但是一旦你弄明白了,你就会发现RxJava真是太棒了. 这里仅仅是帮助你了解RxJava,整个系列共有四篇文章,希望你看完这四篇文章之后能够了解RxJava背后的思想, ...
:ECMAScript 6已经正式发布了,作为它最重要的方言,Javascript也即将迎来语法上的重大变革,InfoQ特开设&深入浅出ES6&专栏,来看一下ES6将给我们带来哪些新内容.本专栏文章来自Mozilla Web开发者博客,由作者授权翻译并发布. 我们如何遍历数组中的 ...
[]Docker是PaaS供应商dotCloud开源的一个基于LXC 的高级容器引擎,源代码托管在 GitHub 上, 基于Go语言开发并遵从Apache 2.0协议开源.Docker提供了一种在安全.可重复的环境中自动部署软件的方式,它的出现拉开了基于云计算平台发布产品方式的变革序幕.为了更好的促 ...
前言: 在本文中讲述了正则表达式中的组与向后引用,先前向后查看,条件测试,单词边界,选择符等表达式及例子,并分析了正则引擎在执行匹配时的内部机理.
本文是Jan Goyvaerts为RegexBuddy写的教程的译文,版权归原作者所有,欢迎转载.但是为了尊重原作者和译者的劳动,请注明出处!谢谢 ...
&style name=&AppTheme& parent=&q ...
最近在调试一个算法,想通过改变算法的参数看看结果有什么变化. 碰到一个麻烦的事情是,从磁盘加载.构建数据需要15分钟.这就比较讨厌了,也就是说我每次调一个参数前都要等15分钟启动时间? 于是我就想,能不能开一个dat ...
在win7下安装MyEclipse10.安装完成之后运行注册机,总是提示classnotfond显示找不到com.sun.java.swing.plaf.nimbus.NimbusLookAndFeel这个包. 查看 ...
题目: Description John von Neumann, b. Dec. 28, 1903, d. Feb. 8, 1957, was a Hungarian-American mathematician
什么也不多说了,只有俩字: 加油
友情链接:
管理员邮箱:info@RxJava操作符的应用场景 - 简书
RxJava操作符的应用场景
在项目中使用了RxJava了,真切感受到其强大,对比AsyncTask,简直是云泥之别。在项目替换Android原生的AsyncTask以及handler成使用RxJava生出感慨,实在是爽,代码逻辑清晰极了。
注:本篇博文适合已有一定RxJava基础知识阅读。
1,from操作符
这一个操作符我用的最多,那么它最佳的应用场景是什么呢?
当你有一个堆数据(Future、Iterable和数组)需要处理,且处理逻辑相同时。这样说或许有些抽象,打个比喻:相信大家都配过钥匙,假如我要配几把同一个门的钥匙,那么制造这几把钥匙的流程都是一模一样的。而更简洁的概括就是-----有重复性操作时就用它就准没错了。
2,map操作符
map名词是地图的意思,那么这样理解这个操作符就很想不通这个操作符的 应用场景了,但我们敲代码的不可能不熟悉这一个数据结构.
在我使用这个操作符的理解中,map就是变换需要操作的数据内容或者结构的意思。所以其使用场景显而易见:当原始数据不能满足我们的需求,但我们却需要依赖这一原始数据去获取满足我们需求的数据时,那么就用它,准没错。比如:我有一个图片url集合,但我的需求却是依次获取到bitmap,显示在ImageView中,那么用该操作符就能解决问题了
官方文档原文:transform the items emitted by an Observable by applying a function to each item.我的理解:通过使用map中的方法对Observable中发射出来的所有数据进行变换.(翻译如有错误,望指正)
ReativeX map操作符的流程示意图
ps:有兴趣的话更希望大家看官方文档,更加原汁原味
3,filter操作符
filter-----过滤数据.这一个操作符浅显易懂,根据一些条件过滤掉不需要的数据.比如说:*我有一个图片url集合,我的需求是依次获取到bitmap,一些像素不清晰的,我希望排除掉,不显示在ImageView,而清晰的才显示在ImageView中.
综合三个操作符的伪代码
Observable
.from(filePathStrS)// String[] filePathStrS :本地图片路径数组
.map(new Func1&String, Bitmap&() {
public Bitmap call(String s) {
return Utils.getBitmapFromFile(photoPath + s);//通过map将String变换成Bitmap
.filter(new Func1&Bitmap, Boolean&() {
public Boolean call(Bitmap bitmap) {
return bitmap !=//筛选掉空文件
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1&Bitmap&() {
public void call(Bitmap bitmap) {//主线程显示数据
questionBitmapList.set(questionInPicCount, bitmap);
QuestionShowAdapter.notifyDataSetChanged();
}, new Action1&Throwable&() {
public void call(Throwable throwable) {
throwable.printStackTrace();
}, new Action0() {
public void call() {
RxJava所学未深,后续如有学到新的操作符,将会更新如果不合理,望不吝指正.RxJava操作符列表 - 简书
RxJava操作符列表
创建操作符:12个【just】:将一个或几个对象转化为可观测源【from】:将一个迭代器、future、数组转化为可观测源【repeat】:不断重复一个对象或序列的可观测源【repeatWhen】:【create】:完全自己创建可观测源:通过写onSubscribe回调【 defer·延迟】:创建一个工厂,根据参数来为每个订阅者,生产可观测源。订阅者可获取最新版本的观测数据。【range·范围】:创建范围内整数组的可观测源【interval·间距】:相隔特定间距时间发射整数序列的可观测源【timer·定时器】:特定时间后发射的可观测源【empty·空】:无数据的可观测源,立即调用onComplete【error】:立即调用onError【never】:永不结束:测试时有用转化操作符:8项【map·映射】:应用一个函数转化·数据【】:
将多个数据转化为多个可观测源或迭代器,压入一个可观测源中
concatMap:连接而非合并:即不改变顺序【·切换映射】:类似于flatMap,但是新的数据将覆盖之前的数据,切换到当前数据的新可观测源【·浏览】:将前一个结果和数据作为转换函数参数【·分组】:通过key将数据分组,化为一批新的可观测源【·缓存】:缓存一批数据作为元素发射出去【·窗口】:将数据根据窗口拆分为多个可观测源,发射【·抛、投】:转化为特定的类型过滤操作符:23个【·过滤】:过滤条件【·】:条件:最后n个元素【·】:条件:最后一个元素【】:最后一个元素或为空时的默认值【·】:最后n个元素作为list,发射【·略过·跳跃】:略过前面n个元素【】:略过最后n个元素【·取】:取前n个元素【】:取第一个或满足条件的第一个。无时,first会报错,takefirst会发射empty可观测源【】:带默认值的取第一个或满足条件的第一个元素【】:取第n个元素【】:【】:组合操作符:未完待续。谢谢!您还可以使用以下方式登录
当前位置:&>&&>&&>& > RxJava操作符(07-辅助操作)
rxjava 操作符 RxJava操作符(07-辅助操作)
1. Delaydelay的意思就是延迟,这个操作符会延迟一段指定的时间再发射Observable的数据。 RxJava的实现是 delay和delaySubscription。delay:让原始Observable在发射每项数据之前都暂停一段指定的时间段,结果是Observable发射的数据项在时间上整体延后一段时间注意:delay不会平移onError通知,它会立即将这个通知传递给订阅者,同时丢弃任何待发射的onNext通知。但是它会平移一个onCompleted通知。delaySubscription:和delay不同的是,delaySubscription是延迟订阅原始Observable,这样也能达到数据延迟发射的效果示例代码:Observable obs = Observable.create(new Observable.OnSubscribe() {
public void call(Subscriber subscriber) {
for(int i =0;i&5;i++){
subscriber.onError(new Throwable(&VALUE TO MAX&));
subscriber.onNext(i);
subscriber.onCompleted();
}}).putation());SimpleDateFormat sdf = new SimpleDateFormat(&HH:mm:ss&);/* * Delay操作符让原始Observable在发射每项数据之前都暂停一段指定的时间段。 * 效果是Observable发射的数据项在时间上向前整体平移了一个增量 * * 注意:delay不会平移onError通知,它会立即将这个通知传递给订阅者,同时丢弃任何待发射的onNext通知。 * 然而它会平移一个onCompleted通知。 */Log.v(TAG, &delay start:& + sdf.format(new Date()));obs.delay(2, TimeUnit.SECONDS)
.subscribe(new Subscriber() {
public void onCompleted() {
Log.v(TAG, &delay onCompleted& + sdf.format(new Date()));
public void onError(Throwable e) {
Log.v(TAG, &delay onError&+e.getMessage());
public void onNext(Integer integer) {
Log.v(TAG, &delay onNext:& + sdf.format(new Date())+&-&&+integer);
});/* * delaySubscription:延迟订阅原始Observable */Log.v(TAG, &delaySubscription start:& + sdf.format(new Date()));obs.delaySubscription(2, TimeUnit.SECONDS)
.subscribe(new Subscriber() {
public void onCompleted() {
Log.v(TAG, &delaySubscription onCompleted& + sdf.format(new Date()));
public void onError(Throwable e) {
Log.v(TAG, &delaySubscription onError&+e.getMessage());
public void onNext(Integer integer) {
Log.v(TAG, &delaySubscription onNext:& + sdf.format(new Date())+&-&&+integer);
});输出:delay start:01:02:15delay onErrorVALUE TO MAXdelaySubscription start:01:02:15delaySubscription onNext:01:02:17-&0delaySubscription onNext:01:02:17-&1delaySubscription onNext:01:02:17-&2delaySubscription onErrorVALUE TO MAX分析:原始Observable会发射3个整数,然后发送onError通知。delay操作符会让每个发射的数据延迟2s发射出去,但由于原始Observable在2s之内发射了onError消息,而delay不会延迟onError通知,会立即传递给观察者,所以马上就结束了。而delaySubscription是延迟订阅,这个更好理解,就是原始Observable该怎么发射消息还是怎么发射,因为只有订阅之后才会开始发射消息,所以延迟2s。2. DoDo系列操作符就是为原始Observable的生命周期事件注册一个回调,当Observable的某个事件发生时就会调用这些回调。RxJava实现了很多doxxx操作符:doOnEach:为 Observable注册这样一个回调,当Observable没发射一项数据就会调用它一次,包括onNext、onError和 onCompleted doOnNext:只有执行onNext的时候会被调用 doOnSubscribe: 当观察者(Sunscriber)订阅Observable时就会被调用 doOnUnsubscribe: 当观察者取消订阅Observable时就会被调用;Observable通过onError或者onCompleted结束时,会反订阅所有的Subscriber doOnCompleted:当Observable 正常终止调用onCompleted时会被调用。 doOnError: 当Observable 异常终止调用onError时会被调用。 doOnTerminate: 当Observable 终止之前会被调用,无论是正常还是异常终止 finallyDo: 当Observable 终止之后会被调用,无论是正常还是异常终止。示例代码:Log.v(TAG,&doOnNext------------------------&);Observable.just(1, 2, 3)
//只有onNext的时候才会被触发
.doOnNext(new Action1() {
public void call(Integer item) {
Log.v(TAG,&--&doOneNext: & + item);
}).subscribe(new Subscriber() {
public void onNext(Integer item) {
Log.v(TAG,&Next: & + item);
public void onError(Throwable error) {
Log.v(TAG,&Error: & + error.getMessage());
public void onCompleted() {
Log.v(TAG,&Sequence complete.&);
}});Log.v(TAG,&doOnEach,doOnError------------------------&);Observable.just(1, 2, 3)
//Observable每发射一个数据的时候就会触发这个回调,不仅包括onNext还包括onError和onCompleted
.doOnEach(new Action1&() {
public void call(Notification notification) {
Log.v(TAG,&--&doOnEach: & +notification.getKind()+&:&+ notification.getValue());
if( (int)notification.getValue() & 1 ) {
throw new RuntimeException( &Item exceeds maximum value& );
//Observable异常终止调用onError时会被调用
.doOnError(new Action1() {
public void call(Throwable throwable) {
Log.v(TAG,&--&doOnError: &+throwable.getMessage() );
.subscribe(new Subscriber() {
public void onNext(Integer item) {
Log.v(TAG,&Next: & + item);
public void onError(Throwable error) {
Log.v(TAG,&Error: & + error.getMessage());
public void onCompleted() {
Log.v(TAG,&Sequence complete.&);
}});Log.v(TAG,&doxxx------------------------&);Observable.just(1, 2, 3)
.doOnCompleted(new Action0() {
public void call() {
Log.v(TAG,&--&doOnCompleted:正常完成onCompleted&);
//数据序列发送完毕回调
.doOnSubscribe(() -& Log.v(TAG,&--&doOnSubscribe:被订阅&))
//被订阅时回调
//反订阅(取消订阅)时回调。当一个Observable通过OnError或者OnCompleted结束的时候,会反订阅所有的Subscriber
.doOnUnsubscribe(() -& Log.v(TAG,&--&doOnUnsubscribe:反订阅&))
//Observable终止之前会被调用,无论是正常还是异常终止
.doOnTerminate(() -& Log.v(TAG,&--&doOnTerminate:终止之前&))
//Observable终止之后会被调用,无论是正常还是异常终止
.finallyDo(() -& Log.v(TAG,&--&finallyDo:终止之后&))
.subscribe(new Subscriber() {
public void onNext(Integer item) {
Log.v(TAG,&Next: & + item);
public void onError(Throwable error) {
Log.v(TAG,&Error: & + error.getMessage());
public void onCompleted() {
Log.v(TAG,&Sequence complete.&);
}});输出:doOnNext&&&&&&&&&&doOneNext: 1Next: 1&&doOneNext: 2Next: 2&&doOneNext: 3Next: 3Sequence complete.doOnEach,doOnError&&&&&&&&&&doOnEach: OnNext:1Next: 1&&doOnEach: OnNext:2&&doOnEach: OnError:null&&doOnError: 2 exceptions occurred.Error: 2 exceptions occurred.doxxx&&&&&&&&&&doOnSubscribe:被订阅Next: 1Next: 2Next: 3&&doOnCompleted:正常完成onCompleted&&doOnTerminate:终止之前Sequence complete.&&doOnUnsubscribe:反订阅&&finallyDo:终止之后3. Materialize/Dematerializematerialize将来自原始Observable的通知(onNext/onError/onComplete)都转换为一个Notification对象,然后再按原来的顺序一次发射出去。 Dematerialize操作符是Materialize的逆向过程,它将Materialize转换的结果还原成它原本的形式( 将Notification对象还原成Observable的通知)示例代码:Observable obs = Observable.create(new Observable.OnSubscribe() {
public void call(Subscriber subscriber) {
for(int i = 0;i&3; i++){
subscriber.onNext(i);
subscriber.onCompleted();
}});Log.v(TAG, &materialize-----------&);obs.materialize()
.subscribe(new Subscriber<notification&() {
public void onCompleted() {
Log.v(TAG,&Sequence complete.&);
public void onError(Throwable e) {
Log.v(TAG,&onError:&+e.getMessage());
//将所有的消息封装成Notification后再发射出去
public void onNext(Notification integerNotification) {
Log.v(TAG,&onNext:&+integerNotification.getKind()+&:&+integerNotification.getValue());
});Log.v(TAG, &dematerialize-----------&);obs.materialize()
//将Notification逆转为普通消息发射
.dematerialize()
.subscribe(integer-&Log.v(TAG, &deMeterialize:&+integer));</notification输出:materialize&&&&onNext:OnNext:0onNext:OnNext:1onNext:OnNext:2onNext:OnCompleted:nullSequence complete.dematerialize&&&&deMeterialize:0deMeterialize:1deMeterialize:24. ObserveOn/SubscribeOn这两个操作符对于Android开发来说非常适用,因为Android中只能在主线程中修改UI,耗时操作不能在主线程中执行,所以我们经常会创建新的Thread去执行耗时操作,然后配合Handler修改UI,或者使用AsyncTask。RxJava中使用这两个操作符能够让我们非常方便的处理各种线程问题。SubscribeOn:指定Observable自身在哪个调度器上执行(即在那个线程上运行),如果Observable需要执行耗时操作,一般我们可以让其在新开的一个子线程上运行,好比AsyncTask的doInBackground方法。Observable。可以使用observeOn操作符指定Observable在哪个调度器上发送通知给观察者(调用观察者的onNext,onCompleted,onError方法)。一般我们可以指定在主线程中观察,这样就可以修改UI,相当于AsyncTask的onPreExecute 、onPrograssUpdate和onPostExecute 方法中执行关于RxJava的多线程调度器&Scheduler&,后面会有一篇博客详细介绍。示例代码:Observable obs = Observable.create(new Observable.OnSubscribe() {
public void call(Subscriber subscriber) {
Log.v(TAG, &on subscrib:& + Thread.currentThread().getName());
subscriber.onNext(1);
subscriber.onCompleted();
}});//在新建子线程中执行,在主线程中观察obs.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(i -&
Log.v(TAG, &mainThread-onNext:& + Thread.currentThread().getName()));obs.delaySubscription(2, TimeUnit.SECONDS)
.putation())
//用于计算任务,如事件循环或和回调处理
.observeOn(Schedulers.immediate())
//在当前线程立即开始执行任务
.subscribe(i -&
Log.v(TAG, &immediate-onNext:& + Thread.currentThread().getName()));输出:on subscrib:RxNewThreadScheduler-4mainThread-onNext:mainon subscrib:RxComputationScheduler-1immediate-onNext:RxComputationScheduler-15. TimeIntervalTimeInterval操作符拦截原始Observable发射的数据项,替换为两个连续发射物之间流逝的时间长度。 也就是说这个使用这个操作符后发射的不再是原始数据,而是原始数据发射的时间间隔。新的Observable的第一个发射物表示的是在观察者订阅原始Observable到原始Observable发射它的第一项数据之间流逝的时间长度。 不存在与原始Observable发射最后一项数据和发射onCompleted通知之间时长对应的发射物。timeInterval默认在immediate调度器上执行,你可以通过传参数修改。示例代码:Observable.create(new Observable.OnSubscribe() {
public void call(Subscriber subscriber) {
for (int i = 0; i &= 3; i++) {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
subscriber.onNext(i);
subscriber.onCompleted();
}}).subscribeOn(Schedulers.newThread())
.timeInterval()
.subscribe(new Subscriber<timeinterval&() {
public void onCompleted() {
Log.v(TAG, &onCompleted&);
public void onError(Throwable e) {
Log.v(TAG, &onError:&+e.getMessage());
public void onNext(TimeInterval integerTimeInterval) {
Log.v(TAG, &onNext:&+integerTimeInterval.getValue()+
&-&+integerTimeInterval.getIntervalInMilliseconds());
});</timeinterval输出:onNext:0-1010onNext:1-1002onNext:2-1000onNext:3-1001onCompleted6. Timeout如果原始Observable过了指定的一段时长没有发射任何数据,Timeout操作符会以一个onError通知终止这个Observable,或者继续一个备用的Observable。RxJava中的实现的Timeout操作符有好几个变体:timeout(long,TimeUnit): 第一个变体接受一个时长参数,每当原始Observable发射了一项数据,timeout就启动一个计时器,如果计时器超过了指定指定的时长而原始Observable没有发射另一项数据,timeout就抛出TimeoutException,以一个错误通知终止Observable。 这个timeout默认在computation调度器上执行,你可以通过参数指定其它的调度器。 timeout(long,TimeUnit,Observable): 这个版本的timeout在超时时会切换到使用一个你指定的备用的Observable,而不是发错误通知。它也默认在computation调度器上执行。 timeout(Func1):这个版本的timeout使用一个函数针对原始Observable的每一项返回一个Observable,如果当这个Observable终止时原始Observable还没有发射另一项数据,就会认为是超时了,timeout就抛出TimeoutException,以一个错误通知终止Observable。 timeout(Func1,Observable): 这个版本的timeout同时指定超时时长和备用的Observable。它默认在immediate调度器上执行 timeout(Func0,Func1):这个版本的time除了给每一项设置超时,还可以单独给第一项设置一个超时。它默认在immediate调度器上执行。 timeout(Func0,Func1,Observable): 同上,但是同时可以指定一个备用的Observable。它默认在immediate调度器上执行。示例代码:Observable obs = Observable.create(new Observable.OnSubscribe() {
public void call(Subscriber subscriber) {
for (int i = 0; i &= 3; i++) {
Thread.sleep(i * 100);
} catch (InterruptedException e) {
e.printStackTrace();
subscriber.onNext(i);
subscriber.onCompleted();
}});//发射数据时间间隔超过200ms超时obs.timeout(200, TimeUnit.MILLISECONDS)
.subscribe(new Subscriber() {
public void onCompleted() {
Log.v(TAG, &onCompleted&);
public void onError(Throwable e) {
Log.v(TAG, &onError:&+e);
public void onNext(Integer integer) {
Log.v(TAG, &onNext:&+integer);
}});//发射数据时间间隔超过200ms超时,超时后开启备用Observableobs.timeout(200, TimeUnit.MILLISECONDS, Observable.just(10,20))
.subscribe(new Subscriber() {
public void onCompleted() {
Log.v(TAG, &onCompleted&);
public void onError(Throwable e) {
Log.v(TAG, &onError:&+e);
public void onNext(Integer integer) {
Log.v(TAG, &onNext:&+integer);
});输出:onNext:0onNext:1onError:java.util.concurrent.TimeoutExceptiononNext:0onNext:1onNext:10onNext:20onCompleted7. Timestamp它将一个发射T类型数据的Observable转换为一个发射类型为Timestamped的数据的Observable,每一项都包含数据的发射时间。也就是把Observable发射的数据重新包装了一下,将数据发射的时间打包一起发射出去,这样观察者不仅能得到数据,还能得到数据的发射时间。 timestamp默认在immediate调度器上执行,但是可以通过参数指定其它的调度器。示例代码:Observable.just(1,2,3)
.timestamp()
.subscribe(new Subscriber<timestamped&() {
public void onCompleted() {
Log.v(TAG, &onCompleted&);
public void onError(Throwable e) {
Log.v(TAG, &onError:&+e.getMessage());
public void onNext(Timestamped integerTimestamped) {
Log.v(TAG, &onNext:&+integerTimestamped.getValue()+
&,time:&+integerTimestamped.getTimestampMillis());
});</timestamped输出:onNext:1,time:2onNext:2,time:2onNext:3,time:2onCompleted8. UsingUsing操作符指示Observable创建一个只在它的生命周期内存在的资源,当Observable终止时这个资源会被自动释放。using操作符接受三个参数:一个用于 创建一次性资源的工厂函数 一个用于创建Observable的工厂函数 一个用于释放资源的函数当一个观察者订阅using返回的Observable时,using将会使用Observable工厂函数创建观察者要观察的Observable,同时使用资源工厂函数创建一个你想要创建的资源。当观察者取消订阅这个Observable时,或者当观察者终止时(无论是正常终止还是因错误而终止),using使用第三个函数释放它创建的资源。示例代码:class MyObject{
public void release(){
Log.v(TAG, &object resource released&);
}}/** * Using操作符指示Observable创建一个只在它的生命周期内存在的资源, * 当Observable终止时这个资源会被自动释放。 */private void op_Using(TextView textView){
SimpleDateFormat sdf = new SimpleDateFormat(&HH:mm:ss&);
Observable obs = Observable.using(
//一个用于 创建一次性资源的工厂函数
new Func0() {
public MyObject call() {
return new MyObject();
//一个用于创建Observable的工厂函数,这个函数返回的Observable就是最终被观察的Observable
, new Func1&() {
public Observable call(MyObject obj) {
//创建一个Observable,3s之后发射一个简单的数字0
return Observable.timer(3000,TimeUnit.MILLISECONDS);
//一个用于释放资源的函数,当Func2返回的Observable执行完毕之后会被调用
,new Action1(){
public void call(MyObject o) {
o.release();
Subscriber subscriber = new Subscriber() {
public void onCompleted() {
Log.v(TAG, &onCompleted:& + sdf.format(new Date()));
public void onError(Throwable e) {
Log.v(TAG, &onError:&+e.getMessage());
public void onNext(Long l) {
Log.v(TAG, &onNext:&+l);
Log.v(TAG, &start:& + sdf.format(new Date()));
obs.subscribe(subscriber);}输出:start:04:47:26onNext:0onCompleted:04:47:29object resource released9. To将Observable转换为另一个对象或数据结构。它们中的一些会阻塞直到Observable终止,然后生成一个等价的对象或数据结构;另一些返回一个发射那个对象或数据结构的Observable。简而言之就是,将原始Observable转化为一个发射另一个对象或者数据结构的Observable,如果原Observable发射完他的数据需要一段时间,使用To操作符得到的Observable将阻塞等待原Observable发射完后再将数据序列打包后发射出去。RxJava中实现了如下几种To操作符:toList:发射多项数据的Observable会为每一项数据调用onNext方法,用toList操作符让Observable将多项数据组合成一个List,然后调用一次onNext方法传递整个列表。如果原始Observable没有发射任何数据就调用了onCompleted,toList返回的Observable会在调用onCompleted之前发射一个空列表。如果原始Observable调用了onError,toList返回的Observable会立即调用它的观察者的onError方法。toMap: toMap收集原始Observable发射的所有数据项到一个Map(默认是HashMap)然后发射这个Map。你可以提供一个用于生成Map的Key的函数,还可以提供一个函数转换数据项到Map存储的值(默认数据项本身就是值)。toMultiMap:toMultiMap类似于toMap,不同的是,它生成的这个Map同时还是一个ArrayList(默认是这样,你可以传递一个可选的工厂方法修改这个行为)。toSortedList:toSortedList类似于toList,不同的是,它会对产生的列表排序,默认是自然升序,如果发射的数据项没有实现Comparable接口,会抛出一个异常。然而,你也可以传递一个函数作为用于比较两个数据项,这是toSortedList不会使用Comparable接口。toFuture:toFuture操作符只能用于BlockingObservable(首先必须把原始的Observable转换为一个BlockingObservable。可以使用这两个操作符:BlockingObservable.from或the Observable.toBlocking)。这个操作符将Observable转换为一个返回单个数据项的Future,如果原始Observable发射多个数据项,Future会收到一个IllegalArgumentException;如果原始Observable没有发射任何数据,Future会收到一个NoSuchElementException。如果你想将发射多个数据项的Observable转换为Future,可以这样用:myObservable.toList().toBlocking().toFuture()。toIterable:只能用于BlockingObservable。这个操作符将Observable转换为一个Iterable,你可以通过它迭代原始Observable发射的数据集。示例代码:SimpleDateFormat sdf = new SimpleDateFormat(&HH:mm:ss&);//toList:阻塞等待原Observable发射完毕后,将发射的数据转换成List发射出去Log.v(TAG, &toList start:& + sdf.format(new Date()));Observable.interval(1000,TimeUnit.MILLISECONDS)
.subscribe(new Subscriber<list&() {
public void onCompleted() {
Log.v(TAG, &onCompleted&);
public void onError(Throwable e) {
Log.v(TAG, &onError:& + e.getMessage());
public void onNext(List longs) {
Log.v(TAG, &onNext:& + longs+& -&&+ sdf.format(new Date()));
});Observable.just(2,4,1,3)
.delaySubscription(5, TimeUnit.SECONDS)
//延迟5s订阅
.toSortedList()
.subscribe(new Action1<list&() {
public void call(List integers) {
Log.v(TAG, &toSortedList onNext:& + integers);
});Observable.just(2,4,1,3)
.delaySubscription(7, TimeUnit.SECONDS)
//延迟5s订阅
.toMultimap(new Func1() {
//生成map的key
public String call(Integer integer) {
return integer % 2 == 0 ? &偶& : &奇&;
}, new Func1() {
//转换原始数据项到Map存储的值(默认数据项本身就是值)
public String call(Integer integer) {
return integer%2==0?&偶&+integer : &奇&+
.subscribe(new Action1<map&&() {
public void call(Map& stringCollectionMap) {
Collection o = stringCollectionMap.get(&偶&);
Collection j = stringCollectionMap.get(&奇&);
Log.v(TAG, &toMultimap onNext:& + o);
Log.v(TAG, &toMultimap onNext:& + j);
});</map</list</list输出:toList start:08:46:14onNext:[0, 1, 2] -&08:46:17onCompletedtoSortedList onNext:[1, 2, 3, 4]toMultimap onNext:[偶2, 偶4]toMultimap onNext:[奇1, 奇3]有问题请留言,有帮助请点赞(^__^)源码下载:/openXu/RxJavaTest就爱阅读网友整理上传,为您提供最全的知识大全,期待您的分享,转载请注明出处。
欢迎转载:
推荐:    

我要回帖

更多关于 rxjava 操作符详解 的文章

 

随机推荐