线程调度深入
一个基本线程调度的例子:事件在IO线程产生,然后再UI线程被消费;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("Hello RxJava!"); subscriber.onCompleted(); } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<String>() { @Override public void onCompleted() { System.out.println("completed!"); } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { System.out.println(s); } });
|
subscribeOn()原理
subscribeOn()
用来指定Observable
在哪个线程中执行事件流,也就是指定Observable
中OnSubscribe
(计划表)的call()
方法在那个线程发射数据。下面通过源码分析subscribeOn()
是怎样实现线程的切换的。
1 2 3 4 5 6 7 8
| public final Observable<T> subscribeOn(Scheduler scheduler) { if (this instanceof ScalarSynchronousObservable) { return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler); } return create(new OperatorSubscribeOn<T>(this, scheduler)); }
|
subscribeOn()方法是 Observerble 中的方法,一旦调用了该方法,就会创建出一个新的 Observerble 对象;当然还是通过create(OnSubscribe)方法来创建Observerble ;
再来看一下新创建的这个Observerble 对象中的OnSubscribe的实现类内部是如何实现的;OperatorSubscribeOn是OnSubscribe的实现类,自然也要实现call方法来触发事件了.同时一旦换了新的Observerble ,那么最终的观察者订阅的自然也就是新的Observerble 了,这一点一定要明确;那么自然call方法中的参数也就持有了原始观察者的引用.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| public final class OperatorSubscribeOn<T> implements Observable.OnSubscribe<T> {
final Scheduler scheduler; final Observable<T> source;
public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) { this.scheduler = scheduler; this.source = source; }
@Override public void call(final Subscriber<? super T> subscriber) { final Scheduler.Worker inner = scheduler.createWorker(); subscriber.add(inner);
inner.schedule(new Action0() { @Override public void call() { final Thread thread = Thread.currentThread(); Subscriber<T> s = new Subscriber<T>(subscriber) { @Override public void onNext(T t) { subscriber.onNext(t); } ... }; source.unsafeSubscribe(s); } }); } }
|
在call
方法中通过scheduler.createWorker().schedule(Action0)
完成线程的切换.
简单说:这里在subscribeOn()方法中新创建了一个Observable对象(代理Observable),于是发生了原始观察者与代理被观察者订阅的情况,于是代理被观察者中的call()方法被先执行,但是代理被观察者哪里有数据呢,还不是用老方法,又创建了一个代理观察者,然后让代理观察者与原始被观察者进行订阅,一旦发生订阅,数据就发出来了,数据发出来给了代理观察者,代理观察者的onNext()方法中有调用了原始观察者的onNext()方法;这不就解决了嘛,可是如何实现的线程切换呢?
提前说一下:这个Action0对象时作为参数传入一个Runnable实例中,然后将该runnable对象传入线程池,这样就实现了线程的切换,也就是说这个Action0()中的所有动作都是在新的线程池中执行的;
上述说说的一切动作都是在scheduler.createWorker().schedule(new Action0(XXX));都是在这个Action0()中发生的.
这里涉及到两个对象:Scheduler
和Worker
,究竟这是怎么实现的线程切换呢?
Scheduler
其实在subscribeOn(Scheduler scheduler)方法中传入的参数就是 Scheduler 对象;
由于RxJava中有多种调度器,我们就看一个简单的Schedulers.newThread()
,其他调度器的思路是一样的.
先看一下Schedulers
这个类,Schedulers
就是一个调度器的管理器,大管家;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| public final class Schedulers { private final Scheduler computationScheduler; private final Scheduler ioScheduler; private final Scheduler newThreadScheduler; private static final Schedulers INSTANCE = new Schedulers(); private Schedulers() { RxJavaSchedulersHook hook = RxJavaPlugins.getInstance().getSchedulersHook(); ... Scheduler nt = hook.getNewThreadScheduler(); if (nt != null) { newThreadScheduler = nt; } else { newThreadScheduler = RxJavaSchedulersHook.createNewThreadScheduler(); } Scheduler c = hook.getComputationScheduler(); if (c != null) { computationScheduler = c; } else { computationScheduler = RxJavaSchedulersHook.createComputationScheduler(); }
Scheduler io = hook.getIOScheduler(); if (io != null) { ioScheduler = io; } else { ioScheduler = RxJavaSchedulersHook.createIoScheduler(); } } public static Scheduler newThread() { return INSTANCE.newThreadScheduler; } ... }
|
接着跟踪RxJavaSchedulersHook.createNewScheduler()
,看看newThreadScheduler
究竟是如何创建的?
我们发现无论是IO线程,Compute线程,还是NewThread线程调度器,都是RxJavaSchedulersHook.createXXX()
方法创建出来了,其内部是用工厂方法实现的.
最终会找到一个叫NewThreadScheduler
的类:
1 2 3 4 5 6 7 8 9 10
| public final class NewThreadScheduler extends Scheduler { private final ThreadFactory threadFactory; public NewThreadScheduler(ThreadFactory threadFactory) { this.threadFactory = threadFactory; } @Override public Worker createWorker() { return new NewThreadWorker(threadFactory); } }
|
最终看到NewThreadScheduler
就是我们调用subscribeOn(Schedulers.newThread() )
传入的调度器对象,通过上面的分析,我们已经明白了 Scheduler
的产生原理
产生Scheduler
并不是最终目的,而是通过Scheduler
产生 Worker
,然后调用Worker.schedule(Action0)
实现线程的切换.
Worker
通过上面的分析,我们已经明白了 Scheduler
的产生原理,产生Scheduler
并不是最终目的,而是通过Scheduler
产生 Worker
,然后调用Worker.schedule(Action0)
实现线程的切换.
每个调度器对象都有一个createWorker
方法用于创建一个Worker
对象,而NewThreadScheduler
对应创建的Worker
是一个叫NewThreadWorker
的对象.
而在上面的分析中我们也看到了, OperatorSubscribeOn
类中调用了
final Scheduler.Worker inner = scheduler.createWorker()
方法来得到一个 Worker,然后又调用 inner.schedule(Action0)
实现线程的切换
接下来我们跟进schedule()
方法查看其内部的实现原理.同样,这里的 Worker
依然是以最简单的NewThreadWorker
为例.这里删减了部分代码,只留取对整体结构有用的部分.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| public class NewThreadWorker extends Scheduler.Worker implements Subscription { private final ScheduledExecutorService executor; public NewThreadWorker(ThreadFactory threadFactory) { ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory); executor = exec; } @Override public Subscription schedule(final Action0 action) { return schedule(action, 0, null); } @Override public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) { return scheduleActual(action, delayTime, unit); } public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) { Action0 decoratedAction = schedulersHook.onSchedule(action); ScheduledAction run = new ScheduledAction(decoratedAction); Future<?> f; if (delayTime <= 0) { f = executor.submit(run); } else { f = executor.schedule(run, delayTime, unit); } run.add(f);
return run; } ... }
|
我们发现OperatorSubscribeOn
计划表中通过NewThreadWorker.schedule(Action0)
,将Action0
作为参数传入一个Runnable
的实现类:ScheduledAction
,然后将这个runnable放入到一个线程池中执行,这样就实现了线程的切换。
简单说:最原始的subscribeOn()
—调用了—-create(new OperatorSubscribeOn<T>(this, scheduler))
—-创建一个代理被观察者—->OperatorSubscribeOn()
中实现了call()
方法—->call()方法中调用了NewThreadWorker.schedule(Action0)
—-Action0
被包装称一个RUnnable
对象,然后schedule()
方法内部使用了线程池,创建一个新的线程,并将包装的Runnable
对象传递进去,这样就实现了线程的切换
步骤:
- 原始被观察者调用subscribeOn()方法准备切换线程,(这时候还没切换呢.)产生一个代理被观察者
- 原始订阅者订阅代理被观察者(明面代码上你能看得到的)
- 代理被观察者的
onSubscribe.call()
方法执行,提供了一个Runnable
对象,也就是线程已经被切换了
- 新线程中产生一个新的代理观察者,代理观察者订阅原始被观察者(接下来的动作也都是在新线程中执行)
- 原始被观察者发射数据,这个动作已经是在新线程中执行了
- 代理观察者收到数据,再将数据转发给原始观察者
看这张图,帮助理解
此处用到了多线程的知识,多线程这一块还需要总结整理;
多次subscribeOn()的情况
我们发现,每次使用subscribeOn
都会产生一个新的Observable
,并产生一个新的计划表OnSubscribe
,目标Subscriber最后订阅的将是最后一次subscribeOn
产生的新的Observable
。在每个新的OnSubscribe
的call
方法中都会有一个产生一个新的线程,在这个新线程中订阅上一级Observable
,并创建一个新的Subscriber
接受数据,最终原始Observable
将在第一个新线程中发射数据,然后传送给给下一个新的观察者,直到传送到目标观察者,所以多次调用subscribeOn
只有第一个起作用(这只是表面现象,其实每个subscribeOn
都切换了线程,只是最终目标Observable
是在第一个subscribeOn
产生的线程中发射数据的)
也就是说多次调用subscribeOn()
方法其实不是只有第一次方法其作用,而是每次都起作用,这里说的第一次起作用其实说的是最原始的数据发射是在第一次subscribeOn()指定的线程,只不过我们很少关注中间数据的处理过程而已;
一张图理解多订阅的过程:
下面是多次线程切换的伪代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| new Thread(){ @Override public void run() { Subscriber s1 = new Subscriber(); new Thread(){ @Override public void run() { Subscriber s2 = new Subscriber(); new Thread(){ @Override public void run() { Subscriber<T> s3 = new Subscriber<T>(subscriber) { @Override public void onNext(T t) { subscriber.onNext(t); } ... }; Observable.subscribe(s3);
} }.start(); } }.start(); } }.start();
|
observeOn原理
还是需要进一步的整理
observeOn调用的是lift操作符。lift操作符创建了一个代理的Observable,用于接收原始Observable发射的数据,然后在Operator中对数据做一些处理后传递给目标Subscriber。observeOn一样创建了一个代理的Observable,并创建一个代理观察者接受上一级Observable的数据,代理观察者收到数据之后会开启一个线程,在新的线程中,调用下一级观察者的onNext、onCompete、onError方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public final Observable<T> observeOn(Scheduler scheduler) { return observeOn(scheduler, RxRingBuffer.SIZE); }
public final Observable<T> observeOn(Scheduler scheduler, int bufferSize) { return observeOn(scheduler, false, bufferSize); }
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { if (this instanceof ScalarSynchronousObservable) { return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler); } return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize)); }
|
可以看到使用observeOn(Scheduler scheduler)方法时,也是传入了一个scheduler,这和subscribeOn()方法如出一辙,,随着不断深入的调用,其最终使用 lift()操作符创建了一个Observable 对象.这里先不管lift,接着上面的lift()中创建了一个OperatorObserveOn类,其源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
| public final class OperatorObserveOn<T> implements Observable.Operator<T, T> { private final Scheduler scheduler; @Override public Subscriber<? super T> call(Subscriber<? super T> child) { if (scheduler instanceof ImmediateScheduler) { return child; } else if (scheduler instanceof TrampolineScheduler) { return child; } else { ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize); parent.init(); return parent; } }
private static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 { final Subscriber<? super T> child; final Scheduler.Worker recursiveScheduler; final NotificationLite<T> on; final Queue<Object> queue; @Override public void onNext(final T t) { if (isUnsubscribed() || finished) { return; } if (!queue.offer(on.next(t))) { onError(new MissingBackpressureException()); return; } schedule(); } @Override public void onCompleted() { ... schedule(); } @Override public void onError(final Throwable e) { ... schedule(); } protected void schedule() { if (counter.getAndIncrement() == 0) { recursiveScheduler.schedule(this); } } @Override public void call() { long missed = 1L; long currentEmission = emitted; final Queue<Object> q = this.queue; final Subscriber<? super T> localChild = this.child; final NotificationLite<T> localOn = this.on; for (;;) { while (requestAmount != currentEmission) { ... localChild.onNext(localOn.getValue(v)); } } } } }
|
还记得subscribeOn()
时传入的Scheduler
吗,这个observeOn()
也传入了一个Scheduler
,和之前一样,通过这个scheduler产生一个Worker
,然后调用Worker.schedule(Action0)
实现线程的切换.与subscribeOn()
不同的是,这个线程切换时在代理观察者执行onNext()
中执行的,也就是说先把线程切换过去,然后代理观察者在执行的 actual.onNext()
方法.
我们可以参照多次subscribeOn()的图解示例,可以把第二次subscribeOn()替换成observeOn(),那么在产生的第二个代理观察者给原始观察者发消息时,本来是在其onNext()方法中直接调用原始观察者的onNext()的,但是由于有observeOn(),所以在执行onNext的时候进行了线程切换,然后在调用原始观察者的onNext()
总结
只要涉及到操作符,其实就是生成了一套代理的Subscriber
(观察者)、Observable
(被观察者)和OnSubscribe
(计划表)。Observable
最典型的特征就是链式调用,我们暂且将每一步操作称为一级。代理的OnSubscribe
中的call
方法就是让代理Subscriber
订阅上一级Observable
,直到订阅到原始Observable
发射数据,代理Subscriber
收到数据后,可能对数据做一些操作也有可能切换线程,然后将数据传送给下一级Subscriber
,直到目标观察者接收到数据,目标观察者在那个线程接受数据取决于上一个Subscriber
在哪一个线程调用目标观察者的方法。