RxJava 源码阅读(2)— 线程调度
发布于:2016-04-08 21:28 最后更新于:2016-04-08 21:28

摘要: RxJava 提供了非常方便的线程切换功能,这篇文章将探讨 RxJava 的线程调度是如何实现的。

强烈推荐 《给 Android 开发者的 RxJava 详解》 这篇文章,本文的部分图片来源于此,感谢原作者!

注:本文研究的 RxJava 源码版本是 1.1.1,使用的根据代码自动生成类图的工具为 Eclipse 插件 AmaterasUML。

SubscribeOn 与 ObserveOn

RxJava 通过Observable 中的 subscribeOn() 和 observeOn() 方法实现线程切换。

subscribeOn() : 指定 subscribe() 所发生的线程,即 Observable.OnSubscribe被激活时所处的线程,或者叫做事件产生的线程。
observeOn() : 指定 Subscriber 所运行在的线程,或者叫做事件消费的线程。

示例

Observable.create(new Observable.OnSubscribe<String>() {
o@Override
opublic void call(Subscriber<? super String> s) {
os.onNext("Hello, world!");
os.onCompleted();
o}
o})
o.subscribeOn(Schedulers.io())// 指定 subscribe() 发生在 IO 线程
o.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
o.subscribe(new Subscriber<String>() {
o@Override
opublic void onNext(String s) {
oLog.d(tag, s);
o}

o@Override
opublic void onCompleted() {
o}

o@Override
opublic void onError(Throwable e) {
o}
o});

SubscribeOn

原理

事件产生的线程就是初始 Observable 的 OnSubscribe 的 call 方法被调用时所在线程。

subscribeOn 方法返回一个新的 Observable。订阅(subscribe)发生时,新的 Observable 的 OnSubscribe 的 call 方法会被调用,这个方法将会在新的线程中(由调度器 Scheduler 指定)调用初始的 Observable 中的 OnSubscribe 的 call 方法。

原理图

SubscribeOn.png

流程

初始的 Observable 调用 subscribeOn 方法,接着该方法调用 create 方法,参数是 OperatorSubscribeOn。

public final Observable<T> subscribeOn(Scheduler scheduler) {
        return create(new OperatorSubscribeOn<T>(this, scheduler));
}

OperatorSubscribeOn 是一个操作符,它实现的是 OnSubscribe 接口。OperatorSubscribeOn 持有初始Observable 和调度器 Scheduler 的引用,并实现了 OnSubscribe 的 call 方法。

public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {

    final Scheduler scheduler;
    final Observable<T> source;

    @Override
    public void call(final Subscriber<? super T> subscriber) {
o//……
    }
}

create 方法内创建了一个新的 Observable,这个新的 Observable 中的 OnSubscribe 对象就是操作符 OperatorSubscribeOn 的实例。

public static <T> Observable<T> create(OnSubscribe<T> f) {
        return new Observable<T>(hook.onCreate(f));
}

SubscribeOn 方法将返回这个新的 Observable。

之后的 subscribe 方法的是这个新的 Observable 的方法,传入的参数是一个 subscriber。subscribe 方法会去调用这个新的 Observable 中的 onSubscribe 的 call 方法,也就是 OperatorSubscribeOn 的 call 方法,参数是就是原来的 subscriber。

private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
    //……
ohook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
o//……
}

OperatorSubscribeOn 从调度器 scheduler 获取一个 worker,worker 知道将要在哪个线程上调度任务。call 方法中创建了一个新的任务交给 worker 调度。这个任务中根据方法传入的 subscriber 创建了一个新的 subscriber,这个新的 subscriber 相当于原来的 subscriber 的代理,onNext、onError、onComplete 方法都直接交给原来的 subscriber 来做,但是重写了一个 setProducer 方法(不知道做什么的,大概与 backpressure 有关,暂时不用管)。任务中用初始的 Observable 来订阅(unsafeSubscribe)这个新的 subscriber。unsafeSubscribe 和 subscribe 差不多,区别在它没有考虑错误处理等。unsafeSubscribe 会调用初始的 Observable 中的 OnSubscribe 对象的 call 方法,这个方法将在 worker 指定的线程中执行,也就是事件将在 worker 指定的线程中产生。

public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {

    @Override
    public void call(final Subscriber<? super T> subscriber) {
        final Worker inner = scheduler.createWorker();
        subscriber.add(inner);

        inner.schedule(new Action0() {
            @Override
            public void call() {
                Subscriber<T> s = new Subscriber<T>(subscriber) {
                    @Override
                    public void onNext(T t) {
                        subscriber.onNext(t);
                    }

                    @Override
                    public void onError(Throwable e) {
                        try {
                            subscriber.onError(e);
                        } finally {
                            inner.unsubscribe();
                        }
                    }

                    @Override
                    public void onCompleted() {
                        try {
                            subscriber.onCompleted();
                        } finally {
                            inner.unsubscribe();
                        }
                    }

                    @Override
                    public void setProducer(final Producer p) {
                        //……
                    }
                };

                source.unsafeSubscribe(s);
            }
        });
    }
}

ObserveOn

原理

事件消费的线程就是目标 Subscriber 的 OnNext 等方法被调用时所在的线程。

ObserveOn 方法返回一个新的 Observable,并创建一个新的 Subscriber 把目标 Subscriber 包装起来。 订阅(subscribe)发生时,新的 Observable 的 OnSubscribe 的 call 方法会被调用,这个方法接着调用初始 Observable 中的 OnSubscribe 的 call 方法,调用的参数是新的 subscriber。当新的 Subscriber 中的方法被调用时,会转为在新的线程中(由调度器 Scheduler 指定)调用目标 Subscriber 中对应的方法。

原理图

ObserveOn.png

流程

初始的 Observable 调用 observeOn 方法,接着该方法调用 lift 方法(与 subscribeOn 不太一样),参数是 OperatorSubscribeOn。

public final Observable<T> observeOn(Scheduler scheduler) {
        //……
oreturn lift(new OperatorObserveOn<T>(scheduler, false));
}

OperatorObserveOn 也是一个操作符,但是它实现的是 Operator 接口。OperatorSubscribeOn 持有调度器Scheduler 的引用,并实现了 Operator 接口的 call 方法。

public final class OperatorObserveOn<T> implements Operator<T, T> {

    private final Scheduler scheduler;

    @Override
    public Subscriber<? super T> call(Subscriber<? super T> child) {
         //……
    }
}

与 SubscribeOn 类似,lift 方法中同样是创建了一个新的 Observable,在新的 Observable 的 OnSubscribe 的 call 方法中,调用初始的 Observable 中的 OnSubscribe 的 call 方法,方法参数是调用 OperatorObserveOn 的 call 方法返回的 subscriber,这个 subscriber 的类型是 ObserveOnSubscriber。

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        return new Observable<R>(new OnSubscribe<R>() {
            @Override
            public void call(Subscriber<? super R> o) {
                     Subscriber<? super T> st = hook.onLift(operator).call(o);
                     st.onStart();
                     onSubscribe.call(st);
        });
}



public final class OperatorObserveOn<T> implements Operator<T, T> {
    @Override
    public Subscriber<? super T> call(Subscriber<? super T> child) {
            ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError);
            parent.init();
            return parent;
}

ObserveOnSubscriber 相当于在原来的 Subscriber 上做了一层包装,它持有一个 worker 引用,它在 worker 中调用原来的 subscriber 的方法。

private static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {
        final Subscriber<? super T> child;
        final Scheduler.Worker recursiveScheduler;

        @Override
        public void onNext(final T t) {
            //……
            schedule();
        }

        @Override
        public void onCompleted() {
            //……
            schedule();
        }

        @Override
        public void onError(final Throwable e) {
            //……
            schedule();
        }

        protected void schedule() {
            recursiveScheduler.schedule(this);
        }

        @Override
        public void call() {
              //……
              localChild.onNext(localOn.getValue(v));
              //……
        }

}

当 subscribe() 方法被调用时,新的 Observable 的 OnSubscribe 的 call 方法被调用,该方法会调用初始 Observable 的 OnSubscribe 的 call 方法,参数为新的 subscriber。调用新的 subscriber 中的方法时,会转为在指定的线程中调用原来的 subscriber 的方法,也就是事件在指定的线程中消费。

多个SubscribeOn和ObserveOn混合使用

图中共有 5 处含有对事件的操作。
由图中可以看出,①和②两处受第一个 subscribeOn() 影响,运行在红色线程;③和④处受第一个 observeOn() 的影响,运行在绿色线程;⑤处受第二个 onserveOn() 影响,运行在紫色线程;而第二个 subscribeOn() ,由于在通知过程中线程就被第一个 subscribeOn() 截断,因此对整个流程并没有任何影响。

所以当使用了多个 subscribeOn() 的时候,只有第一个 subscribeOn() 起作用。

multi.png

Scheduler

调度器 Scheduler,相当于线程控制器,RxJava 通过它来指定每一段代码应该运行在什么样的线程。

Schedulers

Schedulers.png

Schedulers 是一个创建 Scheduler 的工厂,提供了多种调度器(Scheduler)。

  • Schedulers.immediate() : 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。
  • Schedulers.newThread() : 总是启用新线程,并在新线程执行操作。
  • Schedulers.io() : I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
  • Schedulers.computation() : 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。

Scheduler 是一个抽象类,主要负责创建 worker,具体的任务由 worker 完成。 抽象类 Scheduler.Worker 代表了一个工作序列,它包含了具体的调度方法。Worker 实现了接口 Subscription 的 unsubscribe() 方法,用来在取消订阅时对资源进行回收。

我们选 Schedulers.newThread() 和 Schedulers.io() 两个例子进行说明。

Schedulers.newThread()

Schedulers.newThread() 返回了一个 NewThreadScheduler 对象。

NewThreadScheduler.png

NewThreadScheduler 继承了 Scheduler,并重写了 createWorker() 方法,在该方法中创建了一个NewThreadWorker。NewThreadWork 继承了 Scheduler.Worker,重写了 schedule() 方法。

NewThreadScheduler 每次通过 createWorker 创建一个 NewThreadWorker,任务由交给新的 worker 完成,并不需要做额外的操作。

NewThreadWorker.png

NewThreadWorker 每次创建一个数量为 1 的线程池。

ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory);

ScheduledExecutorService 是 java.util.concurrent 包中提供的类,它可以定时或者周期性地执行任务。

NewThreadWorker 中的 schedule 方法都通过调用 ScheduledExecutorService 的方法完成。 这样的话 NewThreadWorker 的代码应该不多,但是它又考虑新线程中的任务在执行之前被取消的问题。在 Java 7+,只需要开启 RemoveOnCancel 策略即可,任务被取消时会立即从任务队列移除,避免内存泄漏。对于不支持这个策略的 Java 版本,NewThreadWorker 会再创建一个新的线程,这个线程是 static 的,由所有 NewThreadWorker 共享,它将周期性地调用 exec.purge();将每个 NewThreadWorker 中还未执行但已经被取消的任务移除。

Schedulers.io()

Schedulers.io() 返回的是一个 CachedThreadScheduler 对象。

IOWorker.png

类图如上所示,EventLoopWorker、CachedWrokerPool、ThreadWorker 都是 CachedThreadScheduler 的内部类。

CachedThreadScheduler 除了用来创建 EventLoopWorker 外,还持有一个 CachedWorkerPool。

CachedWorkerPool 管理一个 ThreadWorker 队列,这些 Threadworker 可以重用。ThreadWorker 继承自 NewThreadWorker,添加了一个过期时间的属性。CachedWorkerPool 另外还有一个 evictorService 线程,用于周期性地将过期的 ThreadWorker 从队列中移除。

EventLoopWorker 每次从 CachedWorkerPool 中取出一个 ThreadWorker。CachedThreadScheduler 的 createWorker 方法创建的是 EventLoopWorker,但实际的调度由 EventLoopWorker 从 CachedWorkerPool 获取的 ThreadWorker 来完成。

AndroidSchedulers

AndroidSchedulers 在 RxAndroid 的 rx.android.schedulers 包中,这个包只有 AndroidSchedulers.java 和 HandlerScheduler.java 两个文件。

AndroidSchedulers 中只有一个公开的方法 mainThread(),它返回的是一个 HandlerScheduler 对象。在创建这个 Scheduler 对象时,new Handler(Looper.getMainLooper()) 被作为构造参数。

AndroidSchedulers.png

同样的,HandlerScheduler 通过 createWorker 方法创建一个新的 HandlerWorker,HandlerWorker 是HandlerScheduler 的内部类。在 HandlerWorker 的 schedule 方法中,将任务交给 handler 完成。

handler.postDelayed(scheduledAction, unit.toMillis(delayTime));
评论 登录后评论

没有评论