本文将会介绍一些常用 RxJava 的操作符的用法。
创建Observable 操作符
前面的例子介绍了使用 Observable.create()
操作符来创建 Observable
,下面再介绍一下 RxJava 提供的其他方法。
create()
前面已有介绍
just()
just(T item1, ...)
创建 Observable
并自动调用 onNext()
发射数据,可以接受一个或者多个参数, just()
中传递的参数将在 Observer
的 onNext()
方法中接收到。
1 | Observable.just("Hello", "World") |
1 | I/RxJava: onNext : Hello |
defer()
defer(Callable<? extends ObservableSource<? extends T>> supplier)
当观察者订阅时,才创建 Observable。
1 | Observable.defer(new Callable<ObservableSource<String>>() { |
这里新建了一个 Consumer
对象来作为观察者。
fromArray()
fromArray(T... items)
接受一个数组参数,创建 Observable
并自动调用 onNext()
将数组中的数据逐一发送。
1 | Observable.fromArray(new String[]{"Hello","World"}) |
fromIterable()
fromIterable(Iterable<? extends T> source)
接受一个集合参数,创建 Observable
并将集合中的数据逐一发送。
1 | ArrayList<String> list = new ArrayList(); |
interval()
interval(long period, TimeUnit unit)
按照一个固定的时间间隔 period
来发射数据,可以作为一个定时器来使用。
在订阅方法中,可以获取到当前发射数据的次数。
1 | Observable.interval(2, TimeUnit.SECONDS) |
上面例子中当数据发射间隔是 2秒,发射5次后解除订阅关系,停止发射数据。
intervalRange
如果我们不做上面的限制,那么 interval 是可以无限次发射数据的,我们还可以通过另外一个 intervalRange 操作符来限制发射次数。
Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit)
,可以起始点,触发次数,延时,触发周期等。
1 | Observable.intervalRange(5,10,10,2,TimeUnit.SECONDS) |
range()
range(final int start, final int count)
创建一个被观察者并发射从 start
到 count
的整数序列给观察者。
1 | Observable.range(1,5) |
timer()
timer(long delay, TimeUnit unit)
创建一个 Observable 并在它在一个给定的延迟 delay
后发射一个特殊的值(0)给观察者。
1 | Observable.timer(5, TimeUnit.SECONDS) |
concat()
concat()
方法会将参数中的多个数据源合并,并按顺序发射。
1 | Observable source1 = Observable.create(new ObservableOnSubscribe<String>() { |
这里需要注意的是,如果前一个数据源发出 onError
,那么将会中断后面数据的发射。
可以和下面的 first 操作符结合使用。效果类似 amb 操作符的效果。
first()
再来介绍一下 first()
操作符,只发送符合条件的第一个事件,可以与前面的 contact
操作符结合使用。
1 | Observable.concat(source1, source2).subscribeOn(Schedulers.io()) |
如果 source1 和 source2 都有条件发射数据,那么就只发射 source1 的数据。如果只有 source2 有条件发射数据,那么就发送 source2 的数据。如果都不发射,就发送 first()
默认数据。
就是说顺序发射数据时,只要有一个 Observable
发射了数据,那么就不会发射后面的数据了。如果都不发射数据,那么就发送 first(default)
参数里面的默认数据。
调用 ‘onNext’ 和 ‘onError’ 都被认为发送了数据,会阻断后面数据的发送。
这个操作符做网络缓存的时候很有用。举个例子:依次检查 Disk 与 Network,如果 Disk 存在缓存,则不做网络请求,否则进行网络请求。
delay
delay 操作符可以将发射源发射的数据等待一个延迟后再发送给观察者。
1 | Observable.create(new ObservableOnSubscribe<Integer>() { |
上面的例子中,被观察者发送数据后,10秒后观察者才收到数据。
map、flatmap 和 concatMap (变换)
这一组操作符提供数据的变换工作,就是把数据对象变换成其他类型的数据对象,它们都接受一个 Function
类型的参数。但是它们的用法上还是有区别的。
map
先来说一下 map()
操作的使用场景,如果在数据流的传递过程中,我们需要根据当前数据流对象转换或提取生成一种其他类型的数据流对象,可以使用 map()
。
它需要一个 Function
参数,Function
对象的两个参数是转换的源数据和目标数据类型。
比如我们有一个请求网络图片的场景,被订阅者发出的数据是原始的 byte
类型数据,在设置给 ImageView
前我们要转换成 Bitmap
类型的数据,那么就可以用这个操作符。
1 | Observable.create(new ObservableOnSubscribe<byte []>() { |
flatmap
flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)
中同样需要一个 Function
对象作为参数,但是 Function
的目标数据类型变成了 Observable
。flatMap
一般用于输出一个 Observable
,而其随后的 subscribe
中的参数也跟 Observable
中的参数一样。
下面再提供一个使用场景,这个场景属于嵌套的网络请求,比如我们想先进行一次网络请求得到图片的url,然后根据url再进行网络请求得到图片,最后设置给 ImageView
,这种情况下由url到 Bitmap
的转换用 map
是无法实现的,可以使用 flatmap
。
1 | Observable.create(new ObservableOnSubscribe<String>() { |
concatMap
concatMap 和 flatmap 的功能是一样的,都提供了转换功能,只不过在转换后进行合并时 flatmap 用的是 merge,而 concatMap 用的是 concat,因此,concatMap 是有序的,可以保证最终的输出顺序和原序列保持一致。而 flatmap 则不一定,有可能出现交错。
下面通过例子来看一下:
先来看一下 flatmap 的输出顺序,为了模拟真实场景,我们在发送第二个数据是延迟了 100ms:
1 | Observable.fromArray(1,2,3,4,5) |
输出如下,可以看到第二个数据由于有延迟,在最后发送:
1 | 09:55:51.114 E/RxJava: 1 string |
下面来换做下测试:
1 | Observable.fromArray(1,2,3,4,5) |
输出,保持了输入序列输出数据:
1 | 04-17 09:59:07.844 E/RxJava: 1 string |
filter 和 distinct (过滤)
这一组操作符提供数据的过滤工作,filter
对不符合条件的数据进行过滤,distinct
提供去重的功能。最常用的用法之一是过滤 null 对象,它帮我们免去了在 onNext()
函数调用中再去检测 null 值。
filter
filter(Predicate<? super T> predicate)
接受 Predicate
对象参数,它的 test()
方法给出一个过滤条件,如果满足条件,则继续向下传递,如果不满足,则过滤掉。
1 | Observable.range(0,10) |
上面的示例会过滤掉奇数,把偶数打印出来。
如果需要报告错误的状态,可以在 test 方法里抛出 Exception 异常。
distinct
distinct()
过滤掉重复的数据项,过滤规则为:只允许还没有发射过的数据项通过。它还有重载的两个方法 distinct(Function<? super T, K> keySelector)
和 distinct(Function<? super T, K> keySelector, Callable<? extends Collection<? super K>> collectionSupplier)
1 | Observable.just(1, 2, 2, 2, 3, 4, 5, 5, 6, 6) |
mergeWith 和 concatWith (组合)
这一组操作符提供数据的组合工作。
mergeWith
mergeWith(ObservableSource<? extends T> other)
合并两个 Observable
,它们数据可能会交错发射。
1 | Observable.just(1, 2, 3, 4, 5) |
concatWith
concatWith(ObservableSource<? extends T> other)
合并两个 Observable
,它们数据会按顺序发射,一个 Observable
的数据发送完了另外一个才会发射。
1 | Observable.just(1, 2, 3, 4, 5) |
zipWith(聚合)
zipWith(ObservableSource<? extends U> other, BiFunction<? super T, ? super U, ? extends R> zipper)
将两个 Obversable
发射的数据,通过一个函数 BiFunction
的 apply()
方法对对应位置的数据处理后放到一个新的 Observable
中发射,所发射的数据个数与最少的 Observabel
中的一样。
1 | Observable.just(1, 2, 3, 4, 5, 6) |
打印结果:
1 | I/RxJava: accept : 1one |
reduce、scan、reduceWith 和 scanWith
reduce 和 scan 操作符都可以把观察序列的本次操作结果作为参数传递给下一个Observable使用,可以用来实现累加操作。
但他们还是有区别的。
- reduce 是把所有的操作都操作完成之后最后只调用一次观察者,把数据一次性输出。
- scan 是每次操作之后都先把数据输出给观察者,然后再调用scan的回调方法进行第二次操作。
至于 reduceWith 和 scanWith 只是多了一个 Callbale 参数来指定为第一个数据源来使用。
下面通过例子来了解一下它们的用法:
reduce
1 | Observable.just(1, 3, 5) |
输出:
1 | E/Test: para1 = 1, para2 = 3 |
可见,调用了两次reduce的回调方法后,得到最后的总结果,最后调用观察者方法。reduce用来实现多个操作式的与(&&)和或(||)操作非常方便。
scan
1 | Observable.just(1, 3, 5) |
1 | E/Test: result = 1 |
使用scan和reduce的最后输出结果是一样的,但是却调用了三次观察者方法。
reduceWith 和 scanWith
1 | Observable.just(1, 3, 5) |
输出结果:
1 | E/Test: call |
1 | Observable.just(1, 3, 5) |
输出结果:
1 | E/Test: call |
amb、ambArray 和 ambWith
传递两个或多个 Observable 给 amb 或者 ambArray 时,它只发射其中首先发射数据(onNext)或通知(onError或onCompleted)的那个 Observable 的所有数据,而其他所有的 Observable 的将不会被执行直接丢弃。
多个 Observable 是按照顺序执行的。
ambWith的用法:Observable.ambArray(o1,o2)和o1.ambWith(o2)是等价的。
1 | Observable.ambArray(Observable.create(new ObservableOnSubscribe<String>() { |
输出:
1 | E/Test: onSubscribe |
另外,Observable.ambArray 的参数也可以是 Observable.ambArray:
1 | Observable.ambArray(Observable.create(new ObservableOnSubscribe<String>() { |
应用场景,可以用在数据结果的输出是多个不确定场景时,比如:我们需要获取一个图片,它的获取场景有三个:1.参数直接给,2.本地缓存,3.网络获取。这种情况就可以按照这个顺序依次获取,直到某个场景获取到图片。
是不是和 first() 操作符有点类似呢?和 concat 与 first 结合使用类似,但还是有区别的,在于触发发射的条件。
take、 takeLast、takeUntil 和 takeWhile
take
take(long count)
观察者只接受被观察者发出的前 count
个数据。
1 | Observable.just(1, 2, 3, 4, 5, 6) |
takeLast
takeLast(int count)
观察者只接受被观察者发出的后面 count
个数据。
1 | Observable.just(1, 2, 3, 4, 5, 6) |
takeUntil
takeUntil(Predicate<? super T> stopPredicate)
当条件满足是停止发射数据,takeUntil(ObservableSource<U> other)
当 other 发射第一个数据后即停止第一个 Observable
数据的发射。
1 | Observable.just(1, 2, 3, 4, 5, 6) |
takeWhile
takeWhile(Predicate<? super T> predicate)
当满足条件是才会发射数据,遇到不满足条件的情况,就中断退出发射。
1 | Observable.just(1, 2, 3, 4, 5, 6) |
sample
sample(long period, TimeUnit unit)
相当于采样操作,它会定时地扫描被观察者发送的数据,并接收被观察者最近发射的数据。
1 | Observable.interval(2, TimeUnit.SECONDS) |
1 | I/RxJava: accept : 0 |
skip 和 skipLast
skip
skip(long count)
用于过滤被观察者发送的前 n 项数据。
1 | Observable.interval(1, TimeUnit.SECONDS) |
1 | I/RxJava: accept : 6 |
skipLast
skipLast(int count)
用于过滤最后 n 项数据。
repeat、repeatUntil 和 repeatWhen
这组操作符提供在调用 onCompleted()
事件后提供重复调用 Observable
事件的操作。
repeat
repeat
提供重复调用 Observable
事件的操作。
- repeat():无限次重复(Long.MAX_VALUE)
- repeat(long times):重复 times 次
repeatUntil
repeatUntil(BooleanSupplier stop)
重复调用 Observable
事件的操作直到 stop 条件满足。
repeatWhen
repeatWhen(final Function<? super Observable<Object>, ? extends ObservableSource<?>> handler)
当满足一定条件重复调用 Observable
事件的操作。
retry、retryUntil 和 retryWhen
这组操作符提供在调用 onError()
事件后提供重新调用 Observable
事件的操作。
时间节点处理操作
特定的时间节点处理方法:
- doOnEach:发射数据的时候执行
- doAfterNext:数据发射成功后
- doOnNext:调用onNext方法时
- doOnComplete:调用onComplete方法时
- doOnError:调用onError时
- doFinally:onComplete,onError或者取消订阅关系后