对于RXJS来说是新的,但是我正在尝试将单个元素流映射到另一个生成数组的元素流
之后
所有内部/后续流都已完成/加载。然而,我的内在观察似乎没有执行。他们只是感冒了。
高级别,我需要执行HTTPPOST来上载文件列表(在两个不同的数组中,上载到两个不同的端点)。因为它们很大,所以我用5秒的延迟来模拟。请求需要并行执行,但一次只能同时执行X(此处为2)。所有这些都需要在管道内,管道只允许流继续
之后
所有岗位均已完成。
https://stackblitz.com/edit/rxjs-pnwa1b
import { map, mapTo, mergeMap, mergeAll, delay, tap, catchError, toArray } from 'rxjs/operators';
import { interval, merge, forkJoin, of, from, range, Observable } from 'rxjs';
const single = "name";
const first = ["abc", "def"];
const second = of("ghi", "jkl", "mno");
of(single)
.pipe(tap(val => console.log(`emit:${val}`)))
.pipe(
map(claim =>
merge(
from(first).pipe(map(photo => of(photo).pipe(delay(5000)))),
from(second).pipe(map(video => of(video).pipe(delay(5000))))
)
.pipe(
mergeAll(2)
)
.pipe(tap(val => console.log(`emit:${val}`)))
.pipe(toArray())
.pipe(tap(val => console.log(`emit:${val}`)))
)
)
.pipe(
catchError(error => {
console.log("error");
return Observable.throw(error);
})
)
.subscribe(val => console.log(`final:${val}`));
内部订阅不会等到它们完成。使用forkjoin不允许限制并发上载。我怎样才能做到这一点?
更新:
@dmcgrandle的回答非常有帮助,并引导我进行下面似乎有效的更改:
import { map, mapTo, mergeMap, mergeAll, delay, tap, catchError, toArray } from 'rxjs/operators';
import { interval, merge, forkJoin, of, from, range, Observable, throwError } from 'rxjs';
const single = "name";
const first = ["abc", "def"];
const second = of("ghi", "jkl", "mno");
of(single)
.pipe(tap(val => console.log(`emit:${val}`)))
.pipe(
mergeMap(claim =>
merge(
from(first).pipe(map(photo => of(photo).pipe(delay(5000)).pipe(tap(val => console.log(`emit:${val}`))))),
from(second).pipe(map(video => of(video).pipe(delay(5000)).pipe(tap(val => console.log(`emit:${val}`)))))
)
),
mergeAll(2),
toArray()
)
.pipe(
catchError(error => {
console.log("error");
return throwError(error);
})
)
.subscribe(val => console.log(`final:${val}`));