代码之家  ›  专栏  ›  技术社区  ›  j will

rxjava1 stackoverflow异常,具有太多的可观察值

  •  8
  • j will  · 技术社区  · 6 年前

    我正在使用rxjava1进行一个项目,我有一个可观察的链,它偶尔会包含数千个合并或合并在一起的可观察数据。当发生这种情况时,将发生StackOverflow异常,我们将得到如下结果:

    java.lang.StackOverflowError
        at java.util.HashMap.putVal(HashMap.java:631)
        at java.util.HashMap.put(HashMap.java:612)
        at rx.internal.operators.OnSubscribeToMap$ToMapSubscriber.onNext(OnSubscribeToMap.java:127)
        at rx.internal.operators.OnSubscribeFilter$FilterSubscriber.onNext(OnSubscribeFilter.java:76)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)
    

    stacktrace将持续数百行。我在github上看到的唯一相关帖子就是这个问题: https://github.com/ReactiveX/RxJava/issues/3035 . 但是我们提出的将可观测数据添加到列表中的解决方案是我们已经使用过的,而且不起作用。

    我能做些什么来防止这些stackoverflow异常?我需要做一些节流或背压?

    下面是当前代码的外观和导致堆栈溢出的示例:

    public Observable<Map<String, JsonObject>> extractTopLevelSummariesFromForms(JsonArray summaries, Func2<String, String, Observable<JsonObject>> summaryGatherer) {
        List<Observable<JsonObject>> summaryObservables = new LinkedList<>();
        summaries.stream()
                 .map(JsonUtil::safeJsonObject)
                 .filter(summary -> StringUtils.isNotEmpty(summary.getString(NAME))|| StringUtils.isNotEmpty(summary.getString(Form.TITLE)))
                 .forEach(summary -> {
                     if (StringUtils.isNotEmpty(summary.getString(TEXT)))
                         summaryObservables.add(gatherSummariesFromElement(summary.getString(Summary.SHORT_NAME), Summary.SummaryValues.FORM,
                                                                           summary.getString(Summary.SHORT_NAME) + ".hidden",
                                                                           summary.getString(VALUE), summaryGatherer));
                     if (StringUtils.isNotEmpty(summary.getString(Form.TEXT)))
                         summaryObservables.add(gatherSummariesFromElement(summary.getString(Summary.SHORT_NAME), Summary.SummaryValues.FORM,
                                                                           summary.getString(Summary.SHORT_NAME) + ".title",
                                                                           summary.getString(Summary.VALUE), summaryGatherer, true));
                 });
        return Observable.merge(Observable.from(summaryObservables))
                         .filter(summaryResult -> summaryResult != null)
                         .toMap(summaryResult -> summaryResult.getString(KEY), summaryResult -> summaryResult.getJsonObject(TEXT));
    }
    
     private Observable<JsonObject> gatherSummariesFromElement(String parentName, String parentType, String elementName, String summaryValue, Func2<String, String, Observable<JsonObject>> summaryGatherer, Set<String> visited, boolean isList) {
        if (visited.contains(elementName))
            return Observable.just(null);
        visited.add(elementName);
    
        Map<String, JsonObject> summariesMap = new HashMap<>();
    
        summariesMap.put(elementName, new JsonObject().put(Summary.SummaryValues.FORM, form).put(SUMMARY_TYPE, parentType));
        Set<String> variables = TextEngine.getVariables(summariesMap);
    
        Observable<JsonObject> elementSummaryObservable = Observable.just(getSummaryEntry(elementName, form, parentType, isList));
    
        if (variables != null && !variables.isEmpty()) {
            elementSummaryObservable = elementSummaryObservable.mergeWith(Observable.from(variables).flatMap(variable -> {
                if (StringUtils.contains(variable, ".") && StringUtils.equals(parentName, StringUtils.split(variable, ".")[0]))
                    return Observable.just(null);
                else
                    return summaryGatherer.call(parentName, variable).flatMap(variableEntry -> {
                        if (variableEntry == null)
                            return Observable.just(null);
                        else
                            return gatherSummariesFromElement(parentName, variableEntry.getString(SOURCE_TYPE), variable, variableEntry.getString(FORM), summaryGatherer, visited, variableEntry.getBoolean(Summary.SummaryValues.IS_LIST, false));
                    });
            }));
        }
        return elementSummaryObservable;
    }
    

    我已经试过运行 Schedulers.computation() 除网络请求外,计划程序正在运行 Schedulers.io() 调度程序和我仍然有堆栈溢出:

    Exception in thread "pool-26-thread-2" java.lang.IllegalStateException: Fatal Exception thrown on Scheduler.Worker thread.
    at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:59)
    at rx.internal.schedulers.ExecutorScheduler$ExecutorSchedulerWorker.run(ExecutorScheduler.java:107)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
    Caused by: java.lang.StackOverflowError
    at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)
    at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)
    at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)
    at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)
    at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)
    at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)
    
    1 回复  |  直到 6 年前
        1
  •  4
  •   Eugene Popovich    6 年前
    1. 有一个递归调用 gatherSummariesFromElement 可能会非常深入
    2. merge(observate.from(summaryobservates))调用看起来很奇怪,您应该只使用observate.merge(summaryobservates)
    3. 而不是使用 Observable.just(null) 然后过滤空值 Observable.empty()
    4. 建造 summaryObservables 看起来有点过分了。您可以构造有效摘要的列表,而不是在平面图中处理它们
    5. 替换 GatherSummariesFromElement 使用元素创建的递归列表,然后从该列表创建可观察的

    γ

    public Observable<Map<String, JsonObject>> extractTopLevelSummariesFromForms(JsonArray summaries, Func2<String, String, JsonObject> summaryGatherer) {
        List<JsonObject> validSummaries = new LinkedList<>();
        summaries.stream()
                .map(JsonUtil::safeJsonObject)
                .filter(summary -> StringUtils.isNotEmpty(summary.getString(NAME)) || StringUtils.isNotEmpty(summary.getString(Form.TITLE)))
                .forEach(validSummaries::add);
        Set<String> visited = new HashSet<>();
        return Observable.from(validSummaries)
                .flatMap(summary -> {
                    if (StringUtils.isNotEmpty(summary.getString(TEXT)))
                        Observable.from(gatherSummariesFromElement(summary.getString(Summary.SHORT_NAME), Summary.SummaryValues.FORM,
                                summary.getString(Summary.SHORT_NAME) + ".hidden",
                                summary.getString(VALUE), visited, summaryGatherer)));
                    if (StringUtils.isNotEmpty(summary.getString(Form.TEXT)))
                        Observable.from(gatherSummariesFromElement(summary.getString(Summary.SHORT_NAME), Summary.SummaryValues.FORM,
                                summary.getString(Summary.SHORT_NAME) + ".title",
                                summary.getString(Summary.VALUE), summaryGatherer, visited,true)));
                })
                .toMap(summaryResult -> summaryResult.getString(KEY), summaryResult -> summaryResult.getJsonObject(TEXT));
    }
    
    private List<JsonObject> gatherSummariesFromElement(String parentName, String parentType, String elementName, String summaryValue, Func2<String, String, JsonObject> summaryGatherer, Set<String> visited, boolean isList) {
        if (visited.contains(elementName))
            return Collections.emptyList();
        visited.add(elementName);
        List<JsonObject> result = new ArrayList<>()
    
        Map<String, JsonObject> summariesMap = new HashMap<>();
    
        summariesMap.put(elementName, new JsonObject().put(Summary.SummaryValues.FORM, form).put(SUMMARY_TYPE, parentType));
        Set<String> variables = TextEngine.getVariables(summariesMap);
    
        result.add(getSummaryEntry(elementName, form, parentType, isList));
    
        if (variables != null && !variables.isEmpty()) {
            for (String variable : variables) {
                if (StringUtils.contains(variable, ".") && StringUtils.equals(parentName, StringUtils.split(variable, ".")[0])) {
                    // do nothing
                } else {
                    JsonObject variableEntry = summaryGatherer.call(parentName, variable)
                    if (variableEntry != null) {
                        result.addAll(gatherSummariesFromElement(parentName, variableEntry.getString(SOURCE_TYPE), variable, variableEntry.getString(FORM), summaryGatherer, visited, variableEntry.getBoolean(Summary.SummaryValues.IS_LIST, false));
                    }
                }
            }
        }
        return result;
    }