代码之家  ›  专栏  ›  技术社区  ›  Regular User jack.the.ripper

在RXJS中,map不执行内部mapped observable

  •  1
  • Regular User jack.the.ripper  · 技术社区  · 6 年前

    对于RXJS来说是新的,但是我正在尝试将单个元素流映射到另一个生成数组的元素流 之后 所有内部/后续流都已完成/加载。然而,我的内在观察似乎没有执行。他们只是感冒了。

    高级别,我需要执行HTTPPOST来上载文件列表(在两个不同的数组中,上载到两个不同的端点)。因为它们很大,所以我用5秒的延迟来模拟。请求需要并行执行,但一次只能同时执行X(此处为2)。所有这些都需要在管道内,管道只允许流继续 之后 所有岗位均已完成。

    https://stackblitz.com/edit/rxjs-pnwa1b

    import { map, mapTo, mergeMap, mergeAll, delay, tap, catchError, toArray } from 'rxjs/operators';
    import { interval, merge, forkJoin, of, from, range, Observable } from 'rxjs';
    
    const single = "name";
    
    const first = ["abc", "def"];
    
    const second = of("ghi", "jkl", "mno");
    
    of(single)
    .pipe(tap(val => console.log(`emit:${val}`)))
    .pipe(
      map(claim => 
        merge(
          from(first).pipe(map(photo => of(photo).pipe(delay(5000)))),
          from(second).pipe(map(video => of(video).pipe(delay(5000))))
        )
        .pipe(
          mergeAll(2)
        )
        .pipe(tap(val => console.log(`emit:${val}`)))
        .pipe(toArray())
        .pipe(tap(val => console.log(`emit:${val}`)))
      )
    )
    .pipe(
    catchError(error => {
      console.log("error");
      return Observable.throw(error);
    })
    )
    .subscribe(val => console.log(`final:${val}`));
    

    内部订阅不会等到它们完成。使用forkjoin不允许限制并发上载。我怎样才能做到这一点?

    更新:

    @dmcgrandle的回答非常有帮助,并引导我进行下面似乎有效的更改:

    import { map, mapTo, mergeMap, mergeAll, delay, tap, catchError, toArray } from 'rxjs/operators';
    import { interval, merge, forkJoin, of, from, range, Observable, throwError } from 'rxjs';
    
    const single = "name";
    
    const first = ["abc", "def"];
    
    const second = of("ghi", "jkl", "mno");
    
    of(single)
    .pipe(tap(val => console.log(`emit:${val}`)))
    .pipe(
      mergeMap(claim =>
        merge(
          from(first).pipe(map(photo => of(photo).pipe(delay(5000)).pipe(tap(val => console.log(`emit:${val}`))))),
          from(second).pipe(map(video => of(video).pipe(delay(5000)).pipe(tap(val => console.log(`emit:${val}`)))))
        )
      ),
      mergeAll(2),
      toArray()
    )
    .pipe(
    catchError(error => {
      console.log("error");
      return throwError(error);
    })
    )
    .subscribe(val => console.log(`final:${val}`));
    
    1 回复  |  直到 6 年前
        1
  •  1
  •   dmcgrandle    6 年前

    如果我正确理解你,那么我认为这是一个解决办法。你的问题是第一个 map 它不会执行内部订阅,而是将流转换为可见流,这似乎不是您想要的。相反,我用了 mergeMap 那里。

    里面 from 我用concatmap强制 first second 按顺序发生,在另一个开始之前等待一个完成。我也成立了 postToEndpoint 返回可观测数据以更接近实际代码的函数。

    StackBlitz 演示

    代码:

    import { mergeMap, concatMap, delay, tap, catchError, toArray } from 'rxjs/operators';
    import { merge, of, from, concat, throwError } from 'rxjs';
    
    const single = "name";
    
    const first = ["abc", "def"];
    
    const second = of("ghi", "jkl", "mno");
    
    const postToEndpoint1$ = photo => of(photo).pipe(
      tap(data => console.log('start of postTo1 for photo:', photo)),
      delay(5000),
      tap(data => console.log('end of postTo1 for photo:', photo))
    );
    
    const postToEndpoint2$ = video => of(video).pipe(
      tap(data => console.log('start of postTo2 for video:', video)),
      delay(5000),
      tap(data => console.log('end of postTo2 for video:', video))
    );
    
    of(single).pipe(
      tap(val => console.log(`initial emit:${val}`)),
      mergeMap(claim => 
        merge(
          from(first).pipe(concatMap(postToEndpoint1$)),
          from(second).pipe(concatMap(postToEndpoint2$))
          )
      ),
      toArray(),
      catchError(error => {
        console.log("error");
        return throwError(error);
      })
    ).subscribe(val => console.log(`final:`, val));
    

    我希望这有帮助。

    推荐文章