代码之家  ›  专栏  ›  技术社区  ›  bakua

RxJava主题在不正确的调度程序上发出

  •  3
  • bakua  · 技术社区  · 8 年前

    我有一门课,我是单身学生:

    public class SessionStore {
        Subject<Session, Session> subject;
    
        public SessionStore() {
           subject = new SerializedSubject<>(BehaviorSubject.create(new Session());
        }
    
        public void set(Session session) {
            subject.onNext(session);
        }
    
        public Observable<UserSession> observe() {
            return subject.distinctUntilChanged();
        }
    }
    

    在活动I中,观察会话并对每个更改执行网络操作:

    private Subscription init() {
        return sessionStore
                .observe()
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .flatMap(new Func1<Session, Observable<Object>>() {
                    @Override
                    public Observable<Object> call(Session session) {
                        return retrofitService.getAThing();
                    }
                })
                .subscribe(...);
    }
    

    当我订阅会话存储时,主题会发出 io() 立即 BehaviourSubject 并且订户执行 mainThread() .

    当我打电话时,问题就来了 sessionStore.set(new AnotherSession()) 而已经订阅了它。IMO:这应该执行中定义的流 init() io() subject.onNext() 被调用。导致 NetworkOnMainThreadException 当我在做网络操作时 flatMap() .

    我是否理解错误的主题?我这样滥用它们吗?那么,正确的解决方案是什么呢?

    Observable.fromEmitter() 在里面 observe() 方法,但令人惊讶的是,输出结果完全相同。

    3 回复  |  直到 8 年前
        1
  •  7
  •   Sergej Isbrecht    8 年前

    请看一下书中的以下部分: Reactive Programming with RxJava '

    默认情况下,对主题调用onNext()将直接传播到所有观察者的onNext()回调方法。这些方法共享相同的名称并不奇怪。在某种程度上,对Subject调用onText()会间接调用每个订阅者的onText()。

    如果从线程1调用主题的onNext,它将从线程1向订阅方调用onText。将讨论onSubscribe。

    所以首先要做的是: 订阅将发生在哪个线程上:

    retrofitService.getAThing()
    

    我只是猜测,并说它是调用线程。这将是observeOn中描述的线程,即Android UI循环。

    请查看示例代码和输出:

    class SessionStore {
        private Subject<String, String> subject;
    
        public SessionStore() {
            subject = BehaviorSubject.create("wurst").toSerialized();
        }
    
        public void set(String session) {
            subject.onNext(session);
        }
    
        public Observable<String> observe() {
            return subject
                    .asObservable()
                    .doOnNext(s -> System.out.println("Receiving value on Thread:: " + Thread.currentThread()))
                    .distinctUntilChanged();
        }
    }
    
    @Test
    public void name() throws Exception {
        // init
        SessionStore sessionStore = new SessionStore();
    
        TestSubscriber testSubscriber = new TestSubscriber();
        Subscription subscribe = sessionStore
                .observe()
                .flatMap(s -> {
                    return Observable.fromCallable(() -> {
                        System.out.println("flatMap Thread:: " + Thread.currentThread());
                        return s;
                    }).subscribeOn(Schedulers.io());
                })
                .doOnNext(s -> System.out.println("After flatMap Thread:: " + Thread.currentThread()))
                .observeOn(Schedulers.newThread()) // imagine AndroidScheduler here
                .subscribe(testSubscriber); // Do UI-Stuff in subscribe
    
        new Thread(() -> {
            System.out.println("set on Thread:: " + Thread.currentThread());
            sessionStore.set("123");
        }).start();
    
        new Thread(() -> {
            System.out.println("set on Thread:: " + Thread.currentThread());
            sessionStore.set("345");
        }).start();
    
        boolean b = testSubscriber.awaitValueCount(3, 3_000, TimeUnit.MILLISECONDS);
    
        Assert.assertTrue(b);
    }
    

    输出:

    Receiving value on Thread:: Thread[main,5,main]
    flatMap Thread:: Thread[RxIoScheduler-2,5,main]
    After flatMap Thread:: Thread[RxIoScheduler-2,5,main]
    set on Thread:: Thread[Thread-1,5,main]
    set on Thread:: Thread[Thread-0,5,main]
    Receiving value on Thread:: Thread[Thread-1,5,main]
    flatMap Thread:: Thread[RxIoScheduler-2,5,main]
    After flatMap Thread:: Thread[RxIoScheduler-2,5,main]
    Receiving value on Thread:: Thread[Thread-1,5,main]
    flatMap Thread:: Thread[RxIoScheduler-2,5,main]
    After flatMap Thread:: Thread[RxIoScheduler-2,5,main]
    
        2
  •  1
  •   R. Zagórski Krishnraj Anadkat    8 年前

    当您调用操作符时,它会影响整个下游。如果您致电:

    .observeOn(AndroidSchedulers.mainThread())
    

    在不正确的位置,流的其余部分在指定线程上执行。

    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    

    在流的最末端:

    private Subscription init() {
        return sessionStore
                .observe()
                .flatMap(new Func1<Session, Observable<Object>>() {
                    @Override
                    public Observable<Object> call(Session session) {
                        return retrofitService.getAThing();
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(...);
    }
    
        3
  •  0
  •   JohnWowUs    8 年前

    我想你忘记了 Subject 也是一个观察者,所以为了得到 onNext 要在io线程上运行,请尝试:

    public class SessionStore {
        Subject<Session, Session> subject;
    
        public UserSessionStore() {
           subject = new SerializedSubject<>(BehaviorSubject.create(new Session())).observeOn(Schedulers.io());
        }
    
        public void set(Session session) {
            subject.onNext(session);
        }
    
        public Observable<UserSession> observe() {
            return subject.distinctUntilChanged();
        }
    }