代码之家  ›  专栏  ›  技术社区  ›  cocoseis

多个可观察订阅者

  •  1
  • cocoseis  · 技术社区  · 6 年前

    我正在尝试使用替换事件模式 Observables 。总体思路是能够监控 localStorage .如下所示,为此,我创建了一个 Injectable 作为包装: StorageService 。它按预期工作:项目可以写入和读取。如果一个项目得到更新,阅读器也会得到更新。但是,一旦多个读者订阅了相同的 key ,只有一个得到通知。为什么会这样?我希望每个实例都使用 service.read('key') 最终得到相同的可观察结果。在最后显示的测试中,情况似乎并非如此。

    存储服务:

    import { Injectable } from '@angular/core';
    import { Observable } from 'rxjs/Observable';
    import 'rxjs/add/observable/concat';
    import { Observer } from 'rxjs/Observer';
    
    interface StorageItemsDictionary { [id: string]: any; }
    interface ObserverObservableDictionary { [id: string]: { observer: Observer<any>, observable: Observable<any> }; };
    
    @Injectable()
    export class StorageService {
    
      cache: StorageItemsDictionary;
      ood: ObserverObservableDictionary;
    
      constructor() {
        this.cache = {};
        this.ood = {};
      }
    
      /**
       * Get item from current `key` and all further changes
       * @param key
       */
      public read<T>(key: string): Observable<T | null> {
        // assure there is an ObserverObservable pair for the key:
        this.assureOOPair(key);
    
        // create observable from the cache or localStorage if there is an item
        if (this.cache[key] === undefined) {
          const value = localStorage.getItem(key);
          const parsed = <T>JSON.parse(localStorage.getItem(key));
          this.cache[key] = parsed;
        }
    
        // one time observable for current item:
        const singleTimeObservable = new Observable(singleTimeObserver => {
          singleTimeObserver.next(this.cache[key] || null);
          singleTimeObserver.complete();
        });
    
        // merge it with the ood
        return <Observable<T | null>>Observable.concat(singleTimeObservable, this.ood[key].observable);
      }
    
      /**
       * Delete item from `key`
       * @param key
       */
      public delete(key: string): void {
        delete this.cache[key];
        this.ood[key].observer.next(null);
        localStorage.removeItem(key);
      }
    
      /**
       * Store `item` at `key` and notify all readers
       * @param key
       * @param item
       */
      public write<T>(key: string, item: T): void {
        this.assureOOPair(key);
        this.cache[key] = item;
        this.ood[key].observer.next(item); // notify others
        const stringified = JSON.stringify(item);
        localStorage.setItem(key, stringified);
      }
    
      /**
       * Create a ObserverObservable pair at `key` if it does not exist
       * @param key
       */
      private assureOOPair(key: string) {
        if (this.ood[key] === undefined) {
          this.ood[key] = {
            observable: null,
            observer: null
          };
          this.ood[key].observable = new Observable(observer => {
            this.ood[key].observer = observer;
          });
          this.ood[key].observable.subscribe(e => e); // to be able to apply next
        }
      }
    
    }
    

    测试:

      it('multiple reads', async(inject([StorageService], (service: StorageService) => {
        localStorage.removeItem('key');
        let counter = 0;
        let counter2 = 0;
        service.write('key', 'value1');
    
        service.read('key').forEach(v => {
          console.log('multiple reads: 1', v);
          switch (counter) {
            case 0:
              expect(v).toEqual('value1');
              break;
            case 1:
              expect(v).toEqual('value2fail'); // this case gets never called. As a result, the test passes, even though it should fail here…
              break;
          }
          counter++;
        });
    
        service.read('key').forEach(v => {
          console.log('multiple reads: 2', v);
          switch (counter2) {
            case 0:
              expect(v).toEqual('value1');
              break;
            case 1:
              expect(v).toEqual('value2');
              break;
          }
          counter2++;
        });
    
        service.write('key', 'value2');
      })));
    

    测试日志:

    multiple reads: 1 value1
    multiple reads: 2 value1
    multiple reads: 2 value2
    
    1 回复  |  直到 6 年前
        1
  •  2
  •   Abdelrhman Hussien    6 年前

    我想你在找 主题 .主题类似于可观察对象,但您与所有订阅者共享相同的数据。observable将为每个订阅执行。我希望这有帮助。