代码之家  ›  专栏  ›  技术社区  ›  Evan Carroll

RXJS并行队列与并发工作者?

  •  0
  • Evan Carroll  · 技术社区  · 5 年前

    假设我想下载10000个文件。我可以很容易地构建一个包含10000个文件的队列(如果能做得更好,我很乐意接受建议)。

    import request from 'request-promise-native';
    import {from} from 'rxjs';
    
    let reqs = [];
    for ( let i = 0; i < 10000; i++ ) {
      reqs.push(
        from(request(`http://bleh.com/${i}`))
      )
    };
    

    现在我有了一个可以观察到的rx.js数组,它是我从代表队列的承诺中创建的。现在,为了我想要的行为,我要发布

    • 对服务器的三个并发请求
    • 完成请求后,我希望发出一个新的请求。

    我可以为这个问题创建一个解决方案,但是根据 Rxjs queue 我从来没有用过,我想知道最合适的RXJS方法是什么。

    1 回复  |  直到 5 年前
        1
  •  1
  •   cartant    5 年前

    听起来你想要一个相当于 forkJoin 它支持调用者指定的最大并发订阅数。

    可以重新实施 福克林 使用 mergeMap 并且暴露 concurrent 参数, like this :

    import { from, Observable } from "rxjs";
    import { last, map, mergeMap, toArray } from "rxjs/operators";
    
    export function forkJoinConcurrent<T>(
      observables: Observable<T>[],
      concurrent: number
    ): Observable<T[]> {
      // Convert the array of observables to a higher-order observable:
      return from(observables).pipe(
        // Merge each of the observables in the higher-order observable
        // into a single stream:
        mergeMap((observable, observableIndex) => observable.pipe(
          // Like forkJoin, we're interested only in the last value:
          last(),
          // Combine the value with the index so that the stream of merged
          // values - which could be in any order - can be sorted to match
          // the order of the source observables:
          map(value => ({ index: observableIndex, value }))
        ), concurrent),
        // Convert the stream of last values to an array:
        toArray(),
        // Sort the array of value/index pairs by index - so the value
        // indices correspond to the source observable indices and then
        // map the pair to the value:
        map(pairs => pairs.sort((l, r) => l.index - r.index).map(pair => pair.value))
      );
    }