代码之家  ›  专栏  ›  技术社区  ›  Piotr Aleksander Chmielowski

任意数量的Observable::Flatmap调用链

  •  0
  • Piotr Aleksander Chmielowski  · 技术社区  · 6 年前

    假设我有一些 Observable 有副作用的供应商 get 呼叫:

    Subject<String> firstSubject = PublishSubject.create();
    Supplier<Observable<String>> firstSupplier = () -> {
        System.out.println("side effect of first one");
        return firstSubject;
    };
    
    Subject<String> secondSubject = PublishSubject.create();
    Supplier<Observable<String>> secondSupplier = () -> {
        System.out.println("side effect of second one");
        return secondSubject;
    };
    
    Subject<String> thirdSubject = PublishSubject.create();
    Supplier<Observable<String>> thirdSupplier = () -> {
        System.out.println("side effect of third one");
        return thirdSubject;
    };
    

    现在我想用以下方式组合它们 得到 下一个供应商的 可观测的 从上一个发出值( onNext 被调用)。 我可以用以下代码来完成:

    firstSupplier.get()
            .flatMap(__ -> secondSupplier.get())
            .flatMap(__ -> thirdSupplier.get())
            .subscribe();
    // output: side effect of first one
    firstSubject.onNext("");
    // output: side effect of second one
    secondSubject.onNext("");
    // output: side effect of third one
    

    如何重写此代码以接受未知数量的供应商,例如- Collection<Supplier<Observable>> ?

    我已经回顾了 可观测的 (像 merge , concat 但是他们都在收集 ObservableSource 也就是说我必须打电话 得到 对我所有的供应商都很热心。然而,在我的情况下,很重要的一点是要称之为懒惰-只有在之前 可观测的 发出值。

    1 回复  |  直到 6 年前
        1
  •  2
  •   akarnokd    6 年前

    编辑2:

    我完全忘记了 Observable.flatMap 调用映射器,即使 maxConcurrency 设置为1并将生成的 Observable 稍后运行。

    我希望下面的设置按预期工作(也就是说,它没有指定接下来应该发生什么 onNext 对受试者)。

    Subject<String> firstSubject = PublishSubject.create();
    Supplier<Observable<String>> firstSupplier = () -> {
        System.out.println("side effect of first one");
        return firstSubject;
    };
    
    Subject<String> secondSubject = PublishSubject.create();
    Supplier<Observable<String>> secondSupplier = () -> {
        System.out.println("side effect of second one");
        return secondSubject;
    };
    
    Subject<String> thirdSubject = PublishSubject.create();
    Supplier<Observable<String>> thirdSupplier = () -> {
        System.out.println("side effect of third one");
        return thirdSubject;
    };
    
    Collection<Supplier<Observable<String>>> collection =
        Arrays.asList(firstSupplier, secondSupplier, thirdSupplier);
    
    Observable.fromIterable(collection)
    .concatMap(supplier -> supplier.get().take(1))
    .subscribe();
    
    System.out.println("// output: side effect of first one");
    firstSubject.onNext("");
    System.out.println("// output: side effect of second one");
    secondSubject.onNext("");
    System.out.println("// output: side effect of third one");
    

    印刷品:

    side effect of first one
    // output: side effect of first one
    side effect of second one
    // output: side effect of second one
    side effect of third one
    // output: side effect of third one