概述
RxJava的优势这里不再赘述,可以参考我前面 RxJava 系列的博客。Retrofit 2 良好的扩展性使它非常方便的添加了 RxJava 的支持。
使用方法
参考我的博客Retrofit 2 使用指南 。
和上一篇博客中用内置的适配器不同的是:
方法声明:
1 | public interface RequestService { |
初始化请求:
1 | Retrofit retrofit = new Retrofit.Builder() |
方法执行:
1 | public void getDataRx(final CallBack<TestBean> callBack){ |
源码解析
Rxjava 相关代码结构:
可以看到代码量不是很多。RxJava2CallAdapterFactory
和 RxJava2CallAdapter
两个类分别对应上篇博客分析使用内置适配器时的 ExecutorCallAdapterFactory
和 ExecutorCallAdapterFactory.CallAdapter
类,是适配器工厂和适配器类。
RxJava2CallAdapterFactory
先来看一下适配器工厂类。
上面的Demo中通过 Retrofit.Builder()
的 addCallAdapterFactory(RxJava2CallAdapterFactory.create())
方法指定了适配器工厂。RxJava2CallAdapterFactory
提供了三个静态的方法生成适配器工厂,它们分别可以生成同步的 Observable
、异步的 Observable
以及可以制定线程调度器同步 Observable
。
不理解没关系,下面会详细介绍他们的区别。
先来看一下这三个方法:
1 | // 生成同步的 Observable 的适配器工厂 |
因此在动态代理类的 serviceMethod.callAdapter.adapt(okHttpCall)
其实执行的是 RxJava2CallAdapterFactory.get()
方法生成的 RxJava2CallAdapter
对象的 adapt()
方法。
我们先来看一下 RxJava2CallAdapterFactory.get()
方法:
1 | @Override |
这个方法就是生成来一个 RxJava2CallAdapter
对象。
RxJava2CallAdapter
先来看一下构造函数:
1 | RxJava2CallAdapter(Type responseType, @Nullable Scheduler scheduler, boolean isAsync, |
这里是一些条件的配置。
对于这个类重点关注它的 adapt()
方法。
1 | @Override public Object adapt(Call<R> call) { |
CallEnqueueObservable 和 CallExecuteObservable
从名字就可以看出来,这两个类分别处理异步和同步的Observable。
这里主要看一下这两个类的 subscribeActual()
方法。
CallEnqueueObservable.subscribeActual()
1 | @Override protected void subscribeActual(Observer<? super Response<T>> observer) { |
CallExecuteObservable.subscribeActual()
1 | @Override protected void subscribeActual(Observer<? super Response<T>> observer) { |
其实这里主要做了下面几件事:
- clone了原有的call,因为OkHttp.Call只能使用一次
- 设置了 CallDisposable,可用于解除订阅
- 调用 Call 的 execute() 或者 enqueue() 方法。
- 同步方法可能会调用onNext和onComplete,异步方法设置回调函数,在回调函数中调用这些方法。