代码之家  ›  专栏  ›  技术社区  ›  Panch

RXJava-请求处理失败,indexoutofboundsException

  •  0
  • Panch  · 技术社区  · 6 年前

    我正在使用RXJava并遇到以下问题。

        threw exception [Request processing failed; nested exception is java.lang.IndexOutOfBoundsException: Index: 0, Size: 0] with root cause
    rx.exceptions.OnErrorThrowable$OnNextValue: OnError while emitting onNext value: [Ljava.lang.Object;.class
        at rx.exceptions.OnErrorThrowable.addValueAsLastCause(OnErrorThrowable.java:109) ~[rxjava-1.2.0.jar!/:1.2.0]
        at rx.exceptions.Exceptions.throwOrReport(Exceptions.java:190) ~[rxjava-1.2.0.jar!/:1.2.0]
        at rx.internal.operators.OperatorZip$Zip.tick(OperatorZip.java:257) ~[rxjava-1.2.0.jar!/:1.2.0]
        at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onNext(OperatorZip.java:323) ~[rxjava-1.2.0.jar!/:1.2.0]
        at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onNext(OperatorOnErrorResumeNextViaFunction.java:154) ~[rxjava-1.2.0.jar!/:1.2.0]
        at rx.internal.operators.OperatorSubscribeOn$1$1.onNext(OperatorSubscribeOn.java:53) ~[rxjava-1.2.0.jar!/:1.2.0]
        at rx.observers.SerializedObserver.onNext(SerializedObserver.java:92) ~[rxjava-1.2.0.jar!/:1.2.0]
        at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:94) ~[rxjava-1.2.0.jar!/:1.2.0]
        at rx.internal.operators.OperatorTimeoutBase$TimeoutSubscriber.onNext(OperatorTimeoutBase.java:131) ~[rxjava-1.2.0.jar!/:1.2.0]
        at rx.internal.producers.SingleDelayedProducer.emit(SingleDelayedProducer.java:102) ~[rxjava-1.2.0.jar!/:1.2.0]
        at rx.internal.producers.SingleDelayedProducer.setValue(SingleDelayedProducer.java:85) ~[rxjava-1.2.0.jar!/:1.2.0]
        at rx.internal.operators.OnSubscribeFromCallable.call(OnSubscribeFromCallable.java:48) ~[rxjava-1.2.0.jar!/:1.2.0]
        at rx.internal.operators.OnSubscribeFromCallable.call(OnSubscribeFromCallable.java:33) ~[rxjava-1.2.0.jar!/:1.2.0]
        at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48) ~[rxjava-1.2.0.jar!/:1.2.0]
        at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30) ~[rxjava-1.2.0.jar!/:1.2.0]
        at rx.Observable.unsafeSubscribe(Observable.java:10151) ~[rxjava-1.2.0.jar!/:1.2.0]
        at rx.internal.operators.OperatorSubscribeOn$1.call(OperatorSubscribeOn.java:94) ~[rxjava-1.2.0.jar!/:1.2.0]
        at rx.internal.schedulers.CachedThreadScheduler$EventLoopWorker$1.call(CachedThreadScheduler.java:228) ~[rxjava-1.2.0.jar!/:1.2.0]
        at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55) ~[rxjava-1.2.0.jar!/:1.2.0]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_111]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_111]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_111]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[?:1.8.0_111]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_111]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_111]
        at java.lang.Thread.run(Thread.java:745) [?:1.8.0_111]
    

    下面是我试图用RxJava做的一个片段

        Observable<A> AObservable = Observable.fromCallable(() ->
                //External Service Call
        ).timeout(800, TimeUnit.MILLISECONDS)
                .subscribeOn(Schedulers.io())
                .onErrorReturn(throwable -> {
                    LOGGER.warn(format("Server did not respond within %s ms for id=%s", 800, id));
                    return null;
                });
    
        Observable<B> BObservable = Observable.fromCallable(() ->
                //External Service Call
        ).timeout(800, TimeUnit.MILLISECONDS)
                .subscribeOn(Schedulers.io())
                .onErrorReturn( throwable -> {
                    LOGGER.warn(format("Service did not respond within %s ms for id=%s", 800, Id));
                    return null;
                });
    
        // Build Default response
        Observable<C> CObservable = Observable.fromCallable(() ->
                // Build Default one
        ).subscribeOn(Schedulers.io());
    
    
        return Observable.zip(AObservable, BObservable,CObservable,
                (AResponse, BResponse, CResponse) -> {
    
            // Handle response and combine them
    
        }).toBlocking().first();
    

    我正在本地测试它,它运行良好,但是当我将它部署到 aws 我遇到了上述问题。另外,请注意,我并没有遇到所有ID的问题,只是遇到了一些ID。我对 RxJava 是否有人指出异步代码存在潜在问题?

    1 回复  |  直到 6 年前
        1
  •  0
  •   Dmitry    6 年前

    你可以用 switchIfEmpty 运算符返回请求超时时的默认值(因此返回空的Observable)。

    Observable<String> AObservable = Observable.fromCallable(() -> {
        Thread.sleep(100);
        return "response A";
    }).timeout(800, TimeUnit.MILLISECONDS)
            .subscribeOn(Schedulers.io())
            .onErrorResumeNext((value) -> {
                return Observable.empty();
            });
    
    Observable<String> BObservable = Observable.fromCallable(() -> {
        Thread.sleep(1500);
        return "response B";
    }).timeout(800, TimeUnit.MILLISECONDS)
            .subscribeOn(Schedulers.io())
            .onErrorResumeNext((value) -> {
                return Observable.empty();
            });
    
    Observable<String> CObservable = Observable.fromCallable(() -> "default response")
            .subscribeOn(Schedulers.io());
    
    String result = Observable.zip(AObservable, BObservable,
            (AResponse, BResponse) -> AResponse + " and " + BResponse)
            .switchIfEmpty(CObservable)
            .singleElement()
            .blockingGet();
    
    System.out.println(result);
    

    你可以改变 Thread.sleep 获取不同结果的参数。