代码之家  ›  专栏  ›  技术社区  ›  Andrei Vajna II

类似于exhaustMap的运算符,但它从源中记住最后跳过的值并最终执行它

  •  2
  • Andrei Vajna II  · 技术社区  · 6 年前

    我需要一个类似于 exahustMap ,但它会记住上次跳过的可观察对象,并在当前可观察对象完成后执行。

    例如,考虑 exhaustMap :

    exhaustMap marble diagram

    在我的例子中,在发出蓝色值之后,后面会跟着三个值50。当然,在这种情况下 concatMap ,但如果3到5之间也有4,则不会反映在输出中。

    我已经设法编写了自己的运算符,类似于 排气图 实施:

    function exhaustLatestMap<T, R>(project: (value: T) => Subscribable<R>): OperatorFunction<T, R> {
        return source => new Observable<R>(observer => 
            source.subscribe(new ExhaustLatestMapOperatorSubscriber(observer, project)));
    }
    
    class ExhaustLatestMapOperatorSubscriber<T, R> implements Observer<T> {
    
        constructor(
            private observer: Subscriber<R>,
            private project: (value: T) => Subscribable<R>) { }
    
        innerSub: AnonymousSubscription = null;
        latestValue: T;
    
        next(value: T) {
            this.processNext(value);
        }
    
        error(err) {
            this.observer.error(err);
        }
    
        complete() {
            this.observer.complete();
        }
    
        private processNext(value: T) {
            this.latestValue = value;
            if (!this.innerSub) {
                this.innerSub = this.project(value).subscribe({
                    next: v => this.observer.next(v),
                    error: err => {
                        this.observer.error(err);
                        this.endInnerSub(value)
                    },
                    complete: () => {
                        this.endInnerSub(value);
                    }
                });
            }
        }
    
        private endInnerSub(value: T) {
            this.innerSub.unsubscribe();
            this.innerSub = null;
            if (this.latestValue !== value) {
                this.processNext(this.latestValue);
            }
        }
    }
    

    但是我想知道是否有办法通过重用和组合现有的操作符来实现它。有什么想法吗?

    1 回复  |  直到 6 年前
        1
  •  2
  •   cartant    6 年前

    只使用内置的工厂和操作员就可以实现它。但是,AFAICT必须管理每个订阅状态。

    幸运的是 defer 工厂功能使每个订阅状态的管理相对简单和安全。而且,除了帮助管理每个订阅状态之外, 推迟 可用于通知何时订阅可观测对象的机制。

    另一种实现方式:

    const {
      concat,
      defer,
      EMPTY,
      merge,
      of
    } = rxjs;
    
    const {
      delay,
      mergeMap,
      tap
    } = rxjs.operators;
    
    const exhaustMapLatest = project => source => defer(() => {
      let latestValue;
      let hasLatestValue = false;
      let isExhausting = false;
      const next = value => defer(() => {
        if (isExhausting) {
          latestValue = value;
          hasLatestValue = true;
          return EMPTY;
        }
        hasLatestValue = false;
        isExhausting = true;
        return project(value).pipe(
          tap({ complete: () => isExhausting = false }),
          s => concat(s, defer(() => hasLatestValue ?
            next(latestValue) :
            EMPTY
          ))
        );
      });
      return source.pipe(mergeMap(next));
    });
    
    const source = merge(
      of(0).pipe(delay(0)),
      of(1000).pipe(delay(1000)),
      of(1100).pipe(delay(1100)),
      of(1200).pipe(delay(1200)),
      of(2000).pipe(delay(2000))
    );
    
    source.pipe(
      exhaustMapLatest(value => merge(
        of(`${value}:0`).pipe(delay(0)),
        of(`${value}:150`).pipe(delay(150)),
        of(`${value}:300`).pipe(delay(300))
      ))
    ).subscribe(value => console.log(value));
    .as-console-wrapper { max-height: 100% !important; top: 0; }
    <script src="https://unpkg.com/rxjs@6/bundles/rxjs.umd.min.js"></script>

    此实现与您的实现之间存在一些行为差异:

    • 此实现使用 hasLatestValue 标志而不是相等性检查,因此,如果最新值等于初始值,则仍会投影该值。
    • 在这个实现中,如果结果可观测值的订户取消订阅,那么对预计可观测值的任何订阅也将取消订阅。在您的实现中,内部订阅将保持订阅状态(AFAICT),直到预计的可观测数据完成或出现错误为止。

    我不是主张应该这样实施。答案就是展示一个替代的实现。