RxJava提供了Reactive Programming for Java,个人在Android开发中用的相当多,尤其线程切换和链式的数据处理,给码农们提供了极大的便利。在线程切换方面,之前一直用subscribeOn和observeOn配合,不过最近工作需要对线程更加细致的考量,比如zipWith的线程执行由谁决定,于是再重新编码理一遍。
Code
package com.opticalix.theme.zepp;
import com.opticalix.base.BaseRunner;
import com.opticalix.theme.synchronize.PrefixThreadFactory;
import com.opticalix.util.Logger;
import rx.Observable;
import rx.Scheduler;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.schedulers.Schedulers;
import java.util.concurrent.Executors;
public class RxRunner implements BaseRunner {
private final Scheduler mSchedulerIO;
private final Scheduler mSchedulerIO2;
private final Scheduler mSchedulerDisplay;
private final Scheduler mSchedulerCompute;
private final Scheduler mSchedulerCompute2;
public RxRunner() {
mSchedulerDisplay = Schedulers.from(Executors.newSingleThreadExecutor(new PrefixThreadFactory("DISPLAY")));
PrefixThreadFactory ioFactory = new PrefixThreadFactory("IO");
mSchedulerIO = Schedulers.from(Executors.newSingleThreadExecutor(ioFactory));
mSchedulerIO2 = Schedulers.from(Executors.newSingleThreadExecutor(ioFactory));
PrefixThreadFactory computeFactory = new PrefixThreadFactory("COMPUTE");
mSchedulerCompute = Schedulers.from(Executors.newSingleThreadExecutor(computeFactory));
mSchedulerCompute2 = Schedulers.from(Executors.newSingleThreadExecutor(computeFactory));
}
@Override
public void run(String[] args) {
Observable<Integer> src1 = getIntegerObservable(1);
Observable<Integer> src2 = getIntegerObservable(2).subscribeOn(mSchedulerIO2);
src1
.map(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
printThreadInfo("map1");
return "map-1, num=" + integer;
}
})
.map(new Func1<String, String>() {
@Override
public String call(String s) {
printThreadInfo("map2");
return "map-2, s=" + s;
}
})
.zipWith(src2, new Func2<String, Integer, String>() {
@Override
public String call(String s, Integer integer) {
printThreadInfo("zipWith");
return "zipWith s=" + s + ", num=" + integer;
}
})
//第一个subscribeOn指定整个流程的主要IO操作线程
.subscribeOn(mSchedulerIO)
.doOnSubscribe(new Action0() {
@Override
public void call() {
printThreadInfo("doOnSubscribe");
}
})
//不能替代第一个subscribeOn,但是它的作用在于指定doOnSubscribe操作的线程
.subscribeOn(mSchedulerCompute)
.doOnSubscribe(new Action0() {
@Override
public void call() {
printThreadInfo("doOnSubscribe");
}
})
.subscribeOn(mSchedulerCompute2)
//指定最终subscriber的线程
.observeOn(mSchedulerDisplay)
.subscribe(new Subscriber<String>() {
@Override
public void onStart() {
//onStart线程不能指定,始终在subscribe的调用线程
printThreadInfo("onStart");
}
@Override
public void onCompleted() {
printThreadInfo("onCompleted");
System.exit(0);
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void onNext(String s) {
printThreadInfo("onNext, s=" + s);
}
});
}
private Observable<Integer> getIntegerObservable(int num) {
return Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onStart();
subscriber.onNext(num);
subscriber.onCompleted();
printThreadInfo("create, num=" + num);
}
});
}
private void printThreadInfo(String tag) {
String name = Thread.currentThread().getName();
Logger.p("threadName=[%s], tag=%s", name, tag);
}
}
其中ThreadFactory类如下:
package com.opticalix.theme.synchronize;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
public class PrefixThreadFactory implements ThreadFactory {
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String prefix;
public PrefixThreadFactory(String prefix) {
this.prefix = prefix + "-";
}
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, this.prefix + threadNumber.getAndIncrement());
return thread;
}
}
剩下的BaseRunner和Logger只是项目工具类,可以忽略。
执行结果如下:
threadName=[main], tag=onStart
threadName=[COMPUTE-1], tag=doOnSubscribe
threadName=[COMPUTE-2], tag=doOnSubscribe
threadName=[IO-1], tag=map1
threadName=[IO-1], tag=map2
threadName=[IO-1], tag=create, num=1
threadName=[IO-2], tag=zipWith
threadName=[DISPLAY-1], tag=onNext, s=zipWith s=map-2, s=map-1, num=1, num=2
threadName=[IO-2], tag=create, num=2
threadName=[DISPLAY-1], tag=onCompleted
分析
- doOnSubscribe执行顺序最靠前,不属于该链式处理过程,而如名所述在subscribe时就调用。它的执行线程取决于后面的第一个subscribeOn。如果它后面没有指定就使用当前线程。
- subscribeOn控制observable的创建和转换过程时的线程。subscribeOn在一条链式过程中只第一次指定有用。注意每一个创建的observable都要对应指定一个subscribeOn。
- observeOn可以指定多次,可控制其后的过程(从当前调用起到下一个observeOn)的线程。但是用多个observeOn要小心每个的范围。另外observeOn不能完全替代subscribeOn,因为create过程的线程只由第一个subscribeOn指定。
- zipWith,执行在zipWith参数中的observable的subscribeOn的线程。
- onStart的线程无法指定。
BestPractice
由于多个observeOn需要小心控制线程控制范围,所以个人倾向于demo code所示的用法,在链最后使用subscribeOn & observeOn。
小心多个创建的情形(每个observable都要指定subscribeOn)。
如果使用doOnSubscribe,它的线程取决于其后第一个subscribeOn,但在这之前要调用subscribeOn指定一个线程作为其他数据处理的主要线程。