深入操作符 操作符的实现原理?他是如何拦截事件,然后变换处理之后,最后传递到观察者手中的呢?
这里依然以 map()为例,看看map背后到底做了什么:
这个例子更好理解:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 Observable<String> observable = Observable.create(new Observable .OnSubscribe<String>() { @Override public void call (Subscriber<? super String> subscriber) { subscriber.onNext("1" ); subscriber.onNext("2" ); subscriber.onNext("3" ); subscriber.onCompleted(); } }); Observable<Integer> observableProxy = observable.map(new Func1 <String, Integer>() { @Override public Integer call (String s) { System.out.println("转换工厂..." ); return Integer.valueOf(s); } }); Subscriber subscriber = new Subscriber <Integer>() { @Override public void onCompleted () { } @Override public void onError (Throwable e) { } @Override public void onNext (Integer integer) { System.out.println("我是真正的观察者-> " + integer); } }; observableProxy.subscribe(subscriber);
首先看一下这个map
方法内部是如何实现的:首先明确一点:map方法返回的是一个代理被观察者,所以其实是原始观察者订阅了代理被观察者,这是整个前提!!!
1 2 3 4 5 6 7 8 9 public final <R> Observable<R> map (Func1<? super T, ? extends R> func) { return create(new OnSubscribeMap <T, R>(this , func)); }
因为map 实现的功能是 将 a 转换为 b,a/b均是被观察者,所以map中又创建了一个 Observable 的对象;
map背后究竟做了什么?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 public final class OnSubscribeMap <T, R> implements OnSubscribe <R> { final Observable<T> source; final Func1<? super T, ? extends R > transformer; public OnSubscribeMap (Observable<T> source, Func1<? super T, ? extends R> transformer) { this .source = source; this .transformer = transformer; } @Override public void call (final Subscriber<? super R> o) { MapSubscriber<T, R> parent = new MapSubscriber <T, R>(o, transformer); o.add(parent); source.unsafeSubscribe(parent); } static final class MapSubscriber <T, R> extends Subscriber <T> { final Subscriber<? super R> actual; final Func1<? super T, ? extends R > mapper; boolean done; public MapSubscriber (Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) { this .actual = actual; this .mapper = mapper; } @Override public void onNext (T t) { R result; try { result = mapper.call(t); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); unsubscribe(); onError(OnErrorThrowable.addValueAsLastCause(ex, t)); return ; } actual.onNext(result); } @Override public void onError (Throwable e) { if (done) { RxJavaHooks.onError(e); return ; } done = true ; actual.onError(e); } @Override public void onCompleted () { if (done) { return ; } actual.onCompleted(); } @Override public void setProducer (Producer p) { actual.setProducer(p); } } }
代理被观察者存在的意义:之前我在想,如果经过map转换的话,只是需要一个代理观察者就好了,为什么还需要一个代理被观察者,其实代理被观察者在这里有两个使命:
并不是所有的代码都是按照流式一下写下来的,可能是分别创建观察者,被观察者,然后订阅,这个时候map的转换需要产生一个代理被观察者
代理被观察者与原始观察者进行订阅,代理被观察者中的call方法是先执行的,那么它的call()方法内部实现了什么呢?就是原始被观察者和代理被观察者的订阅,因为代理被观察者是没有数据源的,只有在这里call()中进行订阅,原始被观察者才会发出数据,给代理被观察者用func1转换方法处理,处理后再发送原始观察者;
问题1:这个代理观察者的观察者参数是什么时候传进来的?
答:不要心急,记住总的原则是:观察者是在订阅的时候传入进来的.我们可以这样理解,本来我们创建被观察者和观察者,然后订阅.现在多了一个 map 操作,经过map操作后,被观察者就变了,变成了这个代理被观察者,这时候我们忘掉原始的观察者吧.原始的观察者只负责产生原始的事件源,何况这个代理被观察者中还持有原始被观察者的引用.那么在这个代理被观察者的call方法中其实是使用原始被观察者的call发原始数据,只不过发之前先不给观察者,而是自己调用func1来把事件转换一下;
map(Func1 func1);
首先我们看在使用map时,需要传入一个 Func1 实例,其实 Func1 是一个接口,需要实现的方法是一个 call()方法,正是靠这个call 方法实现的转换;
接下来看 map 内部是怎么实现的,其创建了一个 Observable 实例并返回;这也很好理解,进行了 map 转换后还是一个被观察的对象;,
return create(new OnSubscribeMap<T, R>(this, func));
跟进去这个创建的流程中,前面也分析了创建一个 Observable 对象不能 new,而是使用一个 create()方法,并传入一个 OnSubscribe 的实例;同理 map 中创建的 Observable 对象也一样,只是这个传入的是 OnSubscribeMap 对象,这是 OnSubscribe 接口的一个实现类, OnSubscribeMap 需要两个参数,第一个参数是原始的被观察着,第二个参数是用来转换的 Func1 对象,之前也讲过,每个 被观察者中都持有一个 OnSubscribe,这个接口要求实现一个 call 方法,这个call方法是用来触发事件开始执行的,我们用了map新键的一个被观察者,对象被转换后,也需要一个新的触发机制,就在这个call中实现.
接下来看一下 OnSubscribeMap 中时如何实现 call() 方法的,在这个call方法中传入的是观察者,嗯,没错,传入的是观察者,这里新键了一个观察者,我们称之为代理观察者,
总结 这里面一共涉及到了四个对象
被观察者通过 map 转换后产生了新的 代理被观察者,并持有原始被观察者的引用与转换的方法
(map方法产生的)代理被观察者 与 原始观察者产生订阅(明面上的,你看得到)
通过上一步的订阅,代理被观察者就持有了原始观察者的引用
代理被观察者的call方法中创建一个代理观察者
代理观察者与原始被观察者进行订阅
通过上一步的订阅,原始观察者发送消息给 代理观察者,发来的是 String
代理观察者是代理被观察者的内部类,自然也可以访问代理被观察者的属性—转换方法
代理观察者在其onNext()
方法中获取到String
,然后通过转换方法将 String 装换为 int
代理观察者通过原始观察者的引用,将 int传给观察者(其实这时的代理观察者又有点被观察者的意思,它调用了原始观察者的 onNext(int) 方法)
整个流程结束;
本文作者:
Zachaxy
版权声明:
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。