代码之家  ›  专栏  ›  技术社区  ›  Zufar Muhamadeev

rxjava2:使用串联的完备表并在IO线程中观察它们

  •  -3
  • Zufar Muhamadeev  · 技术社区  · 7 年前

    首先,我知道网络操作不应该从主线程调用。这就是为什么我在Scheduler上观察completables。io()!

    我在试着归纳两个完全的。两者都使用网络,这就是我订阅Scheduler的原因。io()。如果我使用concatWith(或and then),代码将失败,出现NetworkOnMainThreadException。以下是kotlin代码:

    val singleSubject = SingleSubject.create<String>(); 
    completalbe1.concatWith(completable2)
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.io())
            .subscribe({
                singleSubject.onSuccess("ok")
            }, { error ->
                Log.e(tag, error.message, error)//here i got exception
                singleSubject.onError(error) 
            })
    return singleSubject
    

    如果我在没有完整链接的情况下重写代码,一切都没问题。以下是工作代码:

    val singleSubject = SingleSubject.create<String>(); 
    completable1
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.io())
            .subscribe({
                completable2
                    .subscribeOn(Schedulers.io())
                    .observeOn(Schedulers.io())
                    .subscribe({
                        singleSubject.onSuccess("ok")
                    }, { error ->
                        Log.e(tag, error.message, error)
                        singleSubject.onError(error)
                    })
            }, {error ->
                Log.e(tag, error.message, error)
                singleSubject.onError(error)
            })
    return singleSubject
    

    我想知道为什么第一个片段不起作用,而第二个片段起作用?

    UPD1:这是stacktrace:

            android.os.NetworkOnMainThreadException
     at android.os.StrictMode$AndroidBlockGuardPolicy.onNetwork(StrictMode.java:1273)
     at libcore.io.BlockGuardOs.recvfrom(BlockGuardOs.java:249)
     at libcore.io.IoBridge.recvfrom(IoBridge.java:549)
     at java.net.PlainSocketImpl.read(PlainSocketImpl.java:481)
     at java.net.PlainSocketImpl.access$000(PlainSocketImpl.java:37)
     at java.net.PlainSocketImpl$PlainSocketInputStream.read(PlainSocketImpl.java:237)
     at okio.Okio$2.read(Okio.java:139)
     at okio.AsyncTimeout$2.read(AsyncTimeout.java:237)
     at okio.RealBufferedSource.exhausted(RealBufferedSource.java:56)
     at okhttp3.internal.connection.RealConnection.isHealthy(RealConnection.java:498)
     at okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:133)
     at okhttp3.internal.connection.StreamAllocation.newStream(StreamAllocation.java:100)
     at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:42)
     at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
     at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
     at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)
     at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
     at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
     at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)
     at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
     at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:120)
     at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
     at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
     at okhttp3.logging.HttpLoggingInterceptor.intercept(HttpLoggingInterceptor.java:211)
     at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
     at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
     at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:185)
     at okhttp3.RealCall.execute(RealCall.java:69)
     at retrofit2.OkHttpCall.execute(OkHttpCall.java:180)
     at com.jakewharton.retrofit2.adapter.rxjava2.CallObservable.subscribeActual(CallObservable.java:41)
     at io.reactivex.Observable.subscribe(Observable.java:10955)
     at com.jakewharton.retrofit2.adapter.rxjava2.BodyObservable.subscribeActual(BodyObservable.java:34)
     at io.reactivex.Observable.subscribe(Observable.java:10955)
     at io.reactivex.internal.operators.observable.ObservableIgnoreElementsCompletable.subscribeActual(ObservableIgnoreElementsCompletable.java:31)
     at io.reactivex.Completable.subscribe(Completable.java:1664)
     at io.reactivex.internal.operators.completable.CompletableConcatArray$ConcatInnerObserver.next(CompletableConcatArray.java:89)
     at io.reactivex.internal.operators.completable.CompletableConcatArray$ConcatInnerObserver.onComplete(CompletableConcatArray.java:65)
     at io.reactivex.internal.operators.completable.CompletableCreate$Emitter.onComplete(CompletableCreate.java:64)
     at com.catalyst.opti.AppManager$transferImage$1$subscribe$1.onStateChanged(AppManager.kt:323)
     at com.amazonaws.mobileconnectors.s3.transferutility.TransferStatusUpdater$1.run(TransferStatusUpdater.java:172)
     at android.os.Handler.handleCallback(Handler.java:742)
     at android.os.Handler.dispatchMessage(Handler.java:95)
     at android.os.Looper.loop(Looper.java:154)
     at android.app.ActivityThread.main(ActivityThread.java:5527)
     at java.lang.reflect.Method.invoke(Native Method)
     at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:739)
     at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:629)
    

    UPD2:

    completable1是一个将文件上载到AWS S3的函数:

    private fun transferImage(imageName: String, image: File): Completable {
        return Completable.create(object : CompletableOnSubscribe {
            override fun subscribe(e: CompletableEmitter) {
                val transferObserver = transferUtility.upload("some", imageName, image)
                transferObserver.setTransferListener(object : TransferListener {
                    override fun onProgressChanged(id: Int, bytesCurrent: Long, bytesTotal: Long) {
                        Log.i(tag, "bytesCurrent: $bytesCurrent, bytesTotal: $bytesTotal")
                    }
    
                    override fun onStateChanged(id: Int, state: TransferState?) {
                        if (state == TransferState.COMPLETED) {
                            e.onComplete()
                        }
                    }
    
                    override fun onError(id: Int, ex: java.lang.Exception) {
                        Log.d(tag, "error transfer s3: ${ex.message}", ex)
                        e.onError(ex)
                    }
                })
            }
        });
    }
    

    completable2是改装2调用:

    @POST("some")
        fun verifyLocation(@Header(AUTH_TOKEN_HEADER) authToken: String, @Body 
    verifyLocation: VerifyLocation): Completable
    
    1 回复  |  直到 5 年前
        1
  •  1
  •   akarnokd    7 年前

    我猜是吧 transferObserver.setTransferListener 调用主线程上的回调,然后主线程将订阅 completable2 也在主线上。你必须申请 subscribeOn(Schedulers.io()) 可完成2 ,就像你的另一个例子一样。

    val singleSubject = SingleSubject.create<String>(); 
    completalbe1.subscribeOn(Schedulers.io())
        .concatWith(completable2.subscribeOn(Schedulers.io())) // <-----------------------
        .observeOn(Schedulers.io())
        .subscribe({
            singleSubject.onSuccess("ok")
        }, { error ->
            Log.e(tag, error.message, error)//here i got exception
            singleSubject.onError(error) 
        })
    
    return singleSubject
    

    subscribeOn 影响订阅(副作用),但您的 completalbe1 当它调用时具有观察效果 onComplete 在主线程上。