RxJava 源码阅读(1)— 关键类
发布于:2016-04-01 21:22 最后更新于:2016-04-01 21:22

摘要: 本文将介绍 RxJava 的实现中几个比较重要的类。

强烈推荐 《给 Android 开发者的 RxJava 详解》 这篇文章!

注:本文研究的 RxJava 源码版本是 1.1.1,使用的根据代码自动生成类图的工具为 Eclipse 插件 AmaterasUML。

RxJavaImportantClass.png

RxJava的源码主要可以分为三个部分:发布订阅(Observable相关)、操作符(Operator相关)、线程调度(Scheduler相关)

1. 发布订阅

Observable:可观察对象,持有一个OnSubscribe的引用,Observable.create方法用来创建一个Observable,入参为OnSubscribe,当Observable.subscribe(subscriber)被调用时,Observable会调用OnSubscribe的call方法。

OnSubscribe:Observable的create方法的入参,OnSubscribe 会被存储在返回的 Observable 对象中,它的作用相当于一个计划表,当 Observable 被订阅的时候,OnSubscribe 的 call() 方法会自动被调用,事件序列就会依照设定依次触发。

Observer:观察者,它决定事件触发的时候将有怎样的行为:onNext,onCompleted,onError。

Subscriber:实现了Observer 的抽象类,对 Observer 接口进行了一些扩展:

  • onStart(): 在 subscribe 刚开始,而事件还未发送之前被调用,可以用于做一些准备工作,例如数据的清零或重置
  • isUnsubscribed(): 判断状态
  • unsubscribe(): 用于取消订阅,在这个方法被调用后,Subscriber 将不再接收事件

Observer 和 Subscriber 具有相同的角色,而且 Observer 在 subscribe() 过程中最终会被转换成 Subscriber 对象。

Subscription:定义了观察者与被观察者之间的订阅关系的接口,是subscribe方法的返回值,可查看订阅状态或者用于取消订阅。

Function和Action将方法包装起来,通过参数传递。它们都只有一个方法 call(),区别是Function包装有返回值的方法,Action包装无返回值的方法。

Action0 无参无返回值,由于 onCompleted() 方法也是无参无返回值的,因此 Action0 可以被当成一个包装对象,将 onCompleted()的内容打包起来将自己作为一个参数传入 subscribe() 以实现不完整定义的回调。

Action1 无返回值,但有一个参数,由于onNext(T obj)和onError(Throwable error)也是单参数无返回值的,因此 Action1 可以将 onNext(obj) 和 onError(error) 打包起来传入 subscribe() 以实现不完整定义的回调。

2. 操作符

RxJava中的操作符可以分为两类。

第一类操作符实现了Observable.OnSubscribe接口,在使用时会调用Observable的create(onsubscirbe),如OperatorSubscribeOn。

Observable中的subscribeOn():

public final Observable<T> subscribeOn(Scheduler scheduler) {
    if (this instanceof ScalarSynchronousObservable) {
      return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
    }
    return create(new OperatorSubscribeOn<T>(this, scheduler));
}

第二类操作符实现了Observable.Operator接口,在使用时会调用Observable的lift(operator),如OperatorMap。

Observable中的map():

public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
    return lift(new OperatorMap<T, R>(func));
}

3. 线程调度

subscribeOn和observeOn两个操作符对应的两个类OperatorSubscribeOn和OperatorObserveOn,这两个操作符都持有一个Scheduler的引用。

Schedulers是一个创建Scheduler的工厂,可以提供多种调度器(Scheduler)。

Scheduler是一个抽象类,抽象类Scheduler.Worker代表了一个工作序列,worker.schedule()包含了具体的调度方法。以NewThreadScheduler为例,NewThreadScheduler继承了Scheduler,并重写了createWorker()方法,在该方法中创建了一个NewThreadWorker。NewThreadWork继承了Scheduler.Worker,重写了schedule()方法。

4. 其他

其他种类的Obervable:在rx.observables包中,包含了一些具有其他附加功能的Observable,如BlockingObservable、GroupedObservable等,可以通过操作符将Observable转化成其他的Observable。

其他种类的Observer:在rx.observers包中,也是附加了功能。

插件:在rx.plugins中。

Subject:代表既是可观察者,又是观察者的类型。

Single:一种特殊的Observable,只发出一个事件。

Subscription:一些Subscription的特定实现。

评论 登录后评论

没有评论