代码之家  ›  专栏  ›  技术社区  ›  Gergely Fehérvári

RxJs如何将两个重叠的可观察项合并为一个

  •  2
  • Gergely Fehérvári  · 技术社区  · 7 年前

    我有两个观察结果:

    -1-2-3-4-5-6-7-8-9-10-11-12-13-14-15-|
    -13--14--15--16--17--18--19-----20---------21--------------22------23--24-->
    

    第一个包含一些递增的数字,但在一段时间后停止(这些是来自数据库的游标结果) 第二个正在不断地发射越来越多的信号。包含来自第一个的一些数字,但不要停止发射。(这些是新插入数据库的数据)

    我希望这两个可观察的物体看起来像这样一个连续的可观察物体:

    -1-2-3-4-5-6-7-8-9-10-11-12-13-14-15-16-17-18-19-20-21-----22------23--24-->
    

    该可观察值只包含每个数字一次,保持发射顺序。

    如何使用尽可能少的内存来解决这个问题?

    2 回复  |  直到 7 年前
        1
  •  3
  •   ZahiC    7 年前

    我认为这里最好的方法是缓冲b$,直到a$流达到b$,然后发出b$的所有缓冲项并切换到b$。类似这样:

    const a = '-1-2-3-4-5-6-7-8-9-10-11-12-13-14-15';
    const b = '-13--14--15--16--17--18--19-----20---------21--------------22------23--24';
    
    const fromMarble = str => Rx.Observable.from(str.split('-')).concatMap(x => Rx.Observable.of(x).delay(1)).filter(v => v.length).map(x => parseInt(x));
    
    const a$ = fromMarble(a).share();
    const b$ = fromMarble(b).share();
    
    const switchingSignal$ = Rx.Observable.combineLatest(a$, b$.take(1), (a, b) => a >= b).filter(x => x).take(1).share();
    
    const distinct$ = Rx.Observable.merge(
    	a$.takeUntil(switchingSignal$).map(x => x + '(from a)'), 
    	b$.buffer(switchingSignal$).take(1).mergeMap(buffered => Rx.Observable.from(buffered)).map(x => x + '(from b$ buffer)'),
    	b$.skipUntil(switchingSignal$).map(x => x + '(from b$)')
    );
    
    distinct$.subscribe(console.log);
    <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.0/Rx.js"></script>
        2
  •  2
  •   Oles Savluk    7 年前

    可以通过从连接的第一个流中获取所有元素来实现这一点( .concat )第二个流除外( .skipWhile 包含)最新元素之前的元素( .last

    const a = '-1-2-3-4-5-6-7-8-9-10-11-12-13-14-15'
    const b = '-13--14--15--16--17--18--19-----20---------21--------------22------23--24'
    const fromMarble = str => Rx.Observable.defer(() => {
      console.log('side effect from subscribing to: ' + str);
      return Rx.Observable.from(str.split('-').filter(v => v.length));
    });
    
    const a$ = fromMarble(a);
    const b$ = fromMarble(b);
    
    const distinct$ = Rx.Observable.concat(
      a$,
      a$.last().switchMap(latest =>
        // .skipWhile + .skip(1) => skipWhile but inclusive
        b$.skipWhile(v => v !== latest).skip(1)
      ),
    );
    
    distinct$.subscribe(e => console.log(e));
    <script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>

    此外,如果您在订阅时有副作用(例如,当您订阅时,将创建新光标),您可以使用以下方法为所有订阅者共享该副作用: const a$ = fromMarble(a).shareReaplay() .

    你可以阅读更多关于分享副作用的内容: