一个例子弄清RxJava线程切换

RxJava提供了Reactive Programming for Java,个人在Android开发中用的相当多,尤其线程切换和链式的数据处理,给码农们提供了极大的便利。在线程切换方面,之前一直用subscribeOn和observeOn配合,不过最近工作需要对线程更加细致的考量,比如zipWith的线程执行由谁决定,于是再重新编码理一遍。

Code

package com.opticalix.theme.zepp;

import com.opticalix.base.BaseRunner;
import com.opticalix.theme.synchronize.PrefixThreadFactory;
import com.opticalix.util.Logger;
import rx.Observable;
import rx.Scheduler;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.schedulers.Schedulers;

import java.util.concurrent.Executors;

public class RxRunner implements BaseRunner {
    private final Scheduler mSchedulerIO;
    private final Scheduler mSchedulerIO2;
    private final Scheduler mSchedulerDisplay;
    private final Scheduler mSchedulerCompute;
    private final Scheduler mSchedulerCompute2;

    public RxRunner() {
        mSchedulerDisplay = Schedulers.from(Executors.newSingleThreadExecutor(new PrefixThreadFactory("DISPLAY")));
        PrefixThreadFactory ioFactory = new PrefixThreadFactory("IO");
        mSchedulerIO = Schedulers.from(Executors.newSingleThreadExecutor(ioFactory));
        mSchedulerIO2 = Schedulers.from(Executors.newSingleThreadExecutor(ioFactory));
        PrefixThreadFactory computeFactory = new PrefixThreadFactory("COMPUTE");
        mSchedulerCompute = Schedulers.from(Executors.newSingleThreadExecutor(computeFactory));
        mSchedulerCompute2 = Schedulers.from(Executors.newSingleThreadExecutor(computeFactory));
    }

    @Override
    public void run(String[] args) {
        Observable<Integer> src1 = getIntegerObservable(1);
        Observable<Integer> src2 = getIntegerObservable(2).subscribeOn(mSchedulerIO2);
        src1
                .map(new Func1<Integer, String>() {
                    @Override
                    public String call(Integer integer) {
                        printThreadInfo("map1");
                        return "map-1, num=" + integer;
                    }
                })
                .map(new Func1<String, String>() {
                    @Override
                    public String call(String s) {
                        printThreadInfo("map2");
                        return "map-2, s=" + s;
                    }
                })
                .zipWith(src2, new Func2<String, Integer, String>() {
                    @Override
                    public String call(String s, Integer integer) {
                        printThreadInfo("zipWith");
                        return "zipWith s=" + s + ", num=" + integer;
                    }
                })
                //第一个subscribeOn指定整个流程的主要IO操作线程
                .subscribeOn(mSchedulerIO)
                .doOnSubscribe(new Action0() {
                    @Override
                    public void call() {
                        printThreadInfo("doOnSubscribe");
                    }
                })
                //不能替代第一个subscribeOn,但是它的作用在于指定doOnSubscribe操作的线程
                .subscribeOn(mSchedulerCompute)
                .doOnSubscribe(new Action0() {
                    @Override
                    public void call() {
                        printThreadInfo("doOnSubscribe");
                    }
                })
                .subscribeOn(mSchedulerCompute2)
                //指定最终subscriber的线程
                .observeOn(mSchedulerDisplay)
                .subscribe(new Subscriber<String>() {

                    @Override
                    public void onStart() {
                        //onStart线程不能指定,始终在subscribe的调用线程
                        printThreadInfo("onStart");
                    }

                    @Override
                    public void onCompleted() {
                        printThreadInfo("onCompleted");
                        System.exit(0);
                    }

                    @Override
                    public void onError(Throwable throwable) {
                        throwable.printStackTrace();
                    }

                    @Override
                    public void onNext(String s) {
                        printThreadInfo("onNext, s=" + s);
                    }
                });
    }

    private Observable<Integer> getIntegerObservable(int num) {
        return Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onStart();
                subscriber.onNext(num);
                subscriber.onCompleted();
                printThreadInfo("create, num=" + num);
            }
        });
    }

    private void printThreadInfo(String tag) {
        String name = Thread.currentThread().getName();
        Logger.p("threadName=[%s], tag=%s", name, tag);
    }

}

其中ThreadFactory类如下:

package com.opticalix.theme.synchronize;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

public class PrefixThreadFactory implements ThreadFactory {
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String prefix;

    public PrefixThreadFactory(String prefix) {
        this.prefix = prefix + "-";
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread thread = new Thread(r, this.prefix + threadNumber.getAndIncrement());
        return thread;
    }
}

剩下的BaseRunner和Logger只是项目工具类,可以忽略。
执行结果如下:

threadName=[main], tag=onStart
threadName=[COMPUTE-1], tag=doOnSubscribe
threadName=[COMPUTE-2], tag=doOnSubscribe
threadName=[IO-1], tag=map1
threadName=[IO-1], tag=map2
threadName=[IO-1], tag=create, num=1
threadName=[IO-2], tag=zipWith
threadName=[DISPLAY-1], tag=onNext, s=zipWith s=map-2, s=map-1, num=1, num=2
threadName=[IO-2], tag=create, num=2
threadName=[DISPLAY-1], tag=onCompleted

分析

  1. doOnSubscribe执行顺序最靠前,不属于该链式处理过程,而如名所述在subscribe时就调用。它的执行线程取决于后面的第一个subscribeOn。如果它后面没有指定就使用当前线程。
  2. subscribeOn控制observable的创建和转换过程时的线程。subscribeOn在一条链式过程中只第一次指定有用。注意每一个创建的observable都要对应指定一个subscribeOn。
  3. observeOn可以指定多次,可控制其后的过程(从当前调用起到下一个observeOn)的线程。但是用多个observeOn要小心每个的范围。另外observeOn不能完全替代subscribeOn,因为create过程的线程只由第一个subscribeOn指定。
  4. zipWith,执行在zipWith参数中的observable的subscribeOn的线程。
  5. onStart的线程无法指定。

BestPractice

由于多个observeOn需要小心控制线程控制范围,所以个人倾向于demo code所示的用法,在链最后使用subscribeOn & observeOn。
小心多个创建的情形(每个observable都要指定subscribeOn)。
如果使用doOnSubscribe,它的线程取决于其后第一个subscribeOn,但在这之前要调用subscribeOn指定一个线程作为其他数据处理的主要线程。