代码之家  ›  专栏  ›  技术社区  ›  Sebastian Edelmeier

反应式链接请求

  •  1
  • Sebastian Edelmeier  · 技术社区  · 7 年前

    我不熟悉反应式编程,只停留在一个可能很简单的点上。 我有两种方法返回观察值。

    GetQueues(): Observable<Queue[]>{...}
    

    GetMessageCount(queue: Queue): Observable<number>{...}
    

    现在我想用一个具有以下签名的方法来链接这些

    GetAllQueueMessageCount(): Observable<number>{...}
    

    可以想象,我想调用第一个方法来读取队列列表,迭代结果,然后为每个队列调用第二个方法。

    我可以想象以下情况,但正如您所见,这并没有返回签名所期望的结果:

    public GetAllQueueMessageCount(): Observable<number> {
        var messageCount = 0;
        this.GetQueues()
            .subscribe(queues => {
                var queueCountRequests = [];
                queues.forEach((queue) => {
                    queueCountRequests.push(this.GetQueueMessageCount(queue));
                });
                Observable.forkJoin(queueCountRequests)
                    .subscribe(t => {
                        t.forEach(
                            count => messageCount = messageCount + (count as number));
                    });
            }, error => Observable.throw(error));
    }
    

    我所有使用flatMap的尝试都得到了相同的结果。

    2 回复  |  直到 7 年前
        1
  •  2
  •   Richard Matsen    7 年前

    我想这就是你要找的结构。

    由于您返回的是可观察的,一般规则是使用 map() 或者它的一种形式,例如 flatMap() 而不是 subscribe() ,并在每个级别使用回报。

    const GetAllQueueMessageCount(): Observable<number> {
      return this.GetQueues()
        .map(queues => {
          const queueCountRequests = queues
            .map(queue => this.GetQueueMessageCount(workflowId, queue.Name) );
          return Observable.forkJoin(queueCountRequests)
            .map(
              results: number[] => results.reduce((sum,num) => sum + num, 0))
              error => Observable.throw(error)
             )
    }
    
        2
  •  1
  •   Olaf Horstmann    7 年前

    与您的问题相关的两条一般规则:

    1. 如果可以避免,请不要手动使用subscribe,在您的情况下,这很可能是可以避免的。

    2. 避免像这样的客户端计算,因为发送多个REST调用(假设您正在这样做)对连接和服务器都非常重要,并且通常需要更长的时间,因为可以执行的并行请求数量是有限的所以我强烈建议为这样的东西实现一个单独的endoint。

    这就是说,我将如何编写该流


    public GetAllQueueMessageCount(): Observable<number> {
        return this.GetQueues()
            .mergeAll() // split up the array into seperate emissions
            .concatMap(queue => this.GetQueueMessageCount(workflowId, queue.Name))
            .toArray() // optional if you want one single result-emit use it, if you want a new emission for every queueMessageCount, remove it
            .map(msgCounts => msgCounts.reduce((sum,num) => sum + num, 0));
    }