代码之家  ›  专栏  ›  技术社区  ›  Expert wanna be

改造中的Rxjava观测和订阅

  •  9
  • Expert wanna be  · 技术社区  · 6 年前

    观察者 : 该方法只是将所有运算符的线程进一步更改为下游 ( https://medium.com/upday-devs/rxjava-subscribeon-vs-observeon-9af518ded53a )

    调用API时,我希望在IO线程上运行与服务器的通信,并希望在主线程上处理结果。

    我在许多教程中看到了下面的代码,毫无疑问它是正确的。 但我的理解是相反的,所以我想知道我误解了什么。

    requestInterface.callApi()
        .observeOn(AndroidSchedulers.mainThread())
        .subscribeOn(Schedulers.io())
        .subscribe())
    

    observeOn(AndroidSchedulers.mainThread())

    :observeOn将所有运算符的线程进一步更改为下游,但在示例中,实际调用的API函数高于observeOn?

    .subscribeOn(Schedulers.io())

    :奇怪的是,它需要在主线程上订阅,但在IO线程上订阅?

    请告诉我我误解了什么?

    4 回复  |  直到 6 年前
        1
  •  6
  •   Community Egal    4 年前

    基本的,我们会有

    Observable.subscribe(Observer);// => Observer observe Observable and Observable subscribe Observer
    

    实例

    requestInterface.callApi().subscribe(new Observer...); // requestInterface.callApi() <=> Observable
    

    http://reactivex.io/documentation/operators/subscribeon.html

    订阅

    • SubscribeOn运算符指定 可观察的 将开始操作,无论该操作员在操作员链中的哪个点被称为

    观察者 (影响2件事)

    • 它指示 可观察的 将通知发送到 观察员 在指定的调度程序上。

    • ObserveOn影响 可观察的 将在该运算符出现的位置使用

    实例

    registerUserReturnedObserverble()  // run on worker thread because subscribeOn(Schedulers.io()) (line 5)
    .andThen(loginReturnObserverble()) // run on worker thread because subscribeOn(Schedulers.io()) (line 5)
    .observeOn(AndroidSchedulers.mainThread())
    .andThen(getUserDataReturnObserverble()) // run on main thread because .observeOn(AndroidSchedulers.mainThread()) is above this operator (line 3)
    .subscribeOn(Schedulers.io())
    .subscribe(new Observer<Void>{
        // run on main thread because observeOn(AndroidSchedulers.mainThread()) 
    });
    
        2
  •  1
  •   Basi    6 年前
    • subscribeOn(Schedulers.io()) :这告诉可观察对象在后台线程上运行任务
    • observeOn(AndroidSchedulers.mainThread()) :这告诉观察者在android UI线程上接收数据,以便您可以采取任何与UI相关的操作。
        3
  •  0
  •   Basi    4 年前

    以下是一个示例:

          getCardsObservable.subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new rx.Observer<List<Card>>() {
                    @Override
                    public void onCompleted() {
    
                    }
    
                    @Override
                    public void onError(Throwable e) {
                        listener.onError(e.getMessage());
                    }
    
                    @Override
                    public void onNext(List<Card> cards) {
                        listener.onSuccess(cards);
                    }
                });
    

    订阅--> 执行调用的线程类似于调用asynctask

    观察--> 用户界面线程将在其中观察到响应

    订阅--> 观察者回调

        4
  •  0
  •   Abhishek Luthra    4 年前

    以下情况指定了使用observeOn()和/或subscribeOn()时可能出现的所有不同情况。

    1. subscribeOn 影响 上游 运算符(subscribeOn以上的运算符)
    2. observeOn 影响 下游的 操作员(低于观察值的操作员)
    3. 如果您没有在RxJava中指定线程(如果您没有指定 订阅 , 观察者 数据将由当前调度程序/线程(通常是主线程)发出和处理。例如,下面链中的所有操作符都将由当前线程(Android的主线程)处理。
    Observable
    .just("big", "bigger", "biggest")    
    .map(String::length)    
    .filter { it > 6 }    
    .subscribe { length -> println("item length $length") }
    
    1. 如果只指定了subscribeOn,则所有运算符都将在该线程上执行

      在此处输入代码

    Observable
    .just("big", "bigger", "biggest") 
    .subscribeOn(Schedulers.io())    
    .map(String::length)    
    .filter { it > 6 }    
    .subscribe { length -> println("item length $length") }
    

    数据发射只是,映射和过滤操作符将按照上游操作符的指示在io调度程序上执行 订阅 .

    1. 要是…就好了 观察者 指定时,所有运算符将在当前线程上执行,并且仅在 观察者 将切换到 观察者
    Observable
    .just("big", "bigger", "biggest")   
    .map(String::length)
    .observeOn(Schedulers.computation())     
    .filter { it > 6 }    
    .subscribe { length -> println("item length $length") }
    

    只需发送数据,映射将在currentThread调度程序上执行。

    过滤器将按照下游操作员的指示在计算调度器上执行 观察者 .

    1. 如果两者都有 订阅 观察者 则所有低于 观察者 将切换到 观察者 并休息上述所有操作员 观察者 切换到由指定的线程 订阅 . 这在您指定的任何顺序中都适用 订阅 观察者
    Observable
    .just("big", "bigger", "biggest")   
    .subscribeOn(Schedulers.io()) 
    .map(String::length)
    .observeOn(Schedulers.computation())     
    .filter { it > 6 }    
    .subscribe { length -> println("item length $length") }
    

    根据上游操作员的指示,数据发射和映射操作员将在io调度程序上执行 订阅 .

    过滤器将按照下游操作员的指示在计算调度器上执行 观察者 .

    即使在observeOn之后调用subscribeOn,线程使用率也会相同。

    Observable
    .just("big", "bigger", "biggest")  
    .map(String::length)
    .observeOn(Schedulers.computation()) 
    .filter { it > 6 }   
    .subscribeOn(Schedulers.io()) 
    .subscribe { length -> println("item length $length") }