请看一下书中的以下部分:
Reactive Programming with RxJava
'
默认情况下,对主题调用onNext()将直接传播到所有观察者的onNext()回调方法。这些方法共享相同的名称并不奇怪。在某种程度上,对Subject调用onText()会间接调用每个订阅者的onText()。
如果从线程1调用主题的onNext,它将从线程1向订阅方调用onText。将讨论onSubscribe。
所以首先要做的是:
订阅将发生在哪个线程上:
retrofitService.getAThing()
我只是猜测,并说它是调用线程。这将是observeOn中描述的线程,即Android UI循环。
请查看示例代码和输出:
class SessionStore {
private Subject<String, String> subject;
public SessionStore() {
subject = BehaviorSubject.create("wurst").toSerialized();
}
public void set(String session) {
subject.onNext(session);
}
public Observable<String> observe() {
return subject
.asObservable()
.doOnNext(s -> System.out.println("Receiving value on Thread:: " + Thread.currentThread()))
.distinctUntilChanged();
}
}
@Test
public void name() throws Exception {
// init
SessionStore sessionStore = new SessionStore();
TestSubscriber testSubscriber = new TestSubscriber();
Subscription subscribe = sessionStore
.observe()
.flatMap(s -> {
return Observable.fromCallable(() -> {
System.out.println("flatMap Thread:: " + Thread.currentThread());
return s;
}).subscribeOn(Schedulers.io());
})
.doOnNext(s -> System.out.println("After flatMap Thread:: " + Thread.currentThread()))
.observeOn(Schedulers.newThread()) // imagine AndroidScheduler here
.subscribe(testSubscriber); // Do UI-Stuff in subscribe
new Thread(() -> {
System.out.println("set on Thread:: " + Thread.currentThread());
sessionStore.set("123");
}).start();
new Thread(() -> {
System.out.println("set on Thread:: " + Thread.currentThread());
sessionStore.set("345");
}).start();
boolean b = testSubscriber.awaitValueCount(3, 3_000, TimeUnit.MILLISECONDS);
Assert.assertTrue(b);
}
输出:
Receiving value on Thread:: Thread[main,5,main]
flatMap Thread:: Thread[RxIoScheduler-2,5,main]
After flatMap Thread:: Thread[RxIoScheduler-2,5,main]
set on Thread:: Thread[Thread-1,5,main]
set on Thread:: Thread[Thread-0,5,main]
Receiving value on Thread:: Thread[Thread-1,5,main]
flatMap Thread:: Thread[RxIoScheduler-2,5,main]
After flatMap Thread:: Thread[RxIoScheduler-2,5,main]
Receiving value on Thread:: Thread[Thread-1,5,main]
flatMap Thread:: Thread[RxIoScheduler-2,5,main]
After flatMap Thread:: Thread[RxIoScheduler-2,5,main]