代码之家  ›  专栏  ›  技术社区  ›  Frank Visaggio

使用NGRX和Angular同步处理依赖于另一个可观察对象的可观察对象

  •  2
  • Frank Visaggio  · 技术社区  · 6 年前
     ngOnInit(): void {
        this.store.dispatch(new patients.Load([]));
        this.patients$ = this.store.select(fromPatients.getAll);
    
        this.patients$.map(p =>{ //  patients$: Observable<Patient[]>;
          this.rows = p.map(pat => { //I use this on the front end
            return {
              username: pat.username,
              id: pat.id,
              hasAlert: this.hasAlerts(pat), //made this an observable for the async pipe in view
              settings: "Settings"
            };
          });
          this.table.recalculatePages();
          console.log(this.rows);
          console.log("This happens first");
        }).subscribe();
    
      }
      hasAlerts(pat: Patient): Observable<boolean> {
        var shouldAlert$ = Observable.of(false);//this value is always taken
    
          this.observations2$ = this.dataService.fetchItems<Observation>(
            "Observation",
            null,
            pat.id// How would i pass this in with mergeMap()?  
          );
    
          this.observations2$.subscribe(curObservation => {
            if (curObservation.category.coding[0].code == "GlucoseEvent"){ 
              shouldAlert$ = Observable.of(true);
              console.log("should alert$", shouldAlert$);
            }
          });
    
        console.log("this happens second");
        return shouldAlert$;
      }
    

    在上面的代码中,我解析了一个名为patients$的可观察对象,它有一个patients数组。然后我将这些患者映射到一个名为this的对象数组。我在客户端上显示的行。

    我的问题涉及到hasAlert属性,该属性在对hasAlerts()的方法调用中处理另一个可观察的自身。对hasAlerts()的此方法调用不会同步进行,因此控制台会。日志(“这首先发生”);在我的hasAlerts方法可以执行If语句中的逻辑以决定是否应将其设置为true或false之前发生,而只是使用它在hasAlerts()的第一行中初始化为的值。控制台确认。日志(“这是第二次发生”);第二次显示。

    hasAlerts()可以返回一个布尔值,而不是一个可观察的值,我正在尝试使用前端的asycn管道来解决我的问题(它没有)。

    我相信解决这一问题的方法包括使用mergemap,但我不确定我将如何通过pat。hasAlerts方法需要的id?或者,这可能不是解决当前异步执行此代码问题的正确方法。

    我目前正在尝试使用此 this question about mergemap 来解决我的问题,但要通过批准。我还没有弄清楚hasAlerts中第二个可见的id。 1

    根据Piccis的想法更新了代码。

    this.patients$.map(p =>{ //  patients$: Observable<Patient[]>;
          this.rows = p.map(pat => { //I use this on the front end
            return {
              username: pat.username,
              id: pat.id,
              hasAlert: false, //set the default value
              settings: "Settings"
            };
          })
        }).do(data => console.log("data1",data))
      //   .switchMap(p => Observable.from(p))
      //   .do(data => console.log("data2",data)) // creates a stream of Observable<Patient>
      //   .mergeMap(patient => this.dataService.fetchItems<Observation>(
      //       "Observation",
      //       null,
      //       "pat..frank"//patient[0].id//"pat..frank"//patient.id// patient should be your guy          
      //     )
      //     .map(curObservation => {
      //       console.log("currOBS",curObservation); 
    
      //       if (curObservation.category.coding[0].code == "GlucoseEvent"){
      //         var shouldAlert$ = true;
      //         console.log("should alert$", shouldAlert$);
      //       }
      //     })
      //   ).do(data => console.log(data))
      //  .toArray()
       .subscribe(
          patients => {
              this.table.recalculatePages();
              console.log(this.rows);
          }
       )
    

    Data1返回患者数组。我需要在中间加注释,因为switchmap行有一个语法错误,它说“void类型的参数不能分配给ArrayLike<{}>'类型的参数”

    3 回复  |  直到 6 年前
        1
  •  1
  •   Richard Matsen    6 年前

    基本问题是组合两个异步调用,可以使用 zip()

    ( 笔记 ,我最初发布了一个解决方案 forkJoin() ,但这不适用于ngrx select(),因为select从未完成-因此forkJoin从未激发)。

    转换 Observable<Patient[]> 从第一次提取返回到 Observable<Patient> ,因为它对zip操作员更方便。

    下一个问题是,第二个异步依赖于第一个异步的结果(pat.id)-使用 concatMap()

    ( 笔记 ,我最初建议 mergeMap() 然而 concatMap() 保证hasAlert$的顺序与patient$相同。这很重要,因为 this.dataService.fetchItems() 可能会返回单个无序回迁)。

    import { zip } from 'rxjs/observable/zip';
    ...
    
    ngOnInit(): void {
      this.store.dispatch(new patients.Load([]));
    
      const patient$ = this.store.select(fromPatients.getAll)
        .mergeMap(value => value); // convert from Observable<patients[]> to Observable<patient>
    
      const hasAlert$ = patient$.concatMap(patient => {
        return this.dataService.fetchItems<Observation>('Observation' null, patient.id)
          .map(curObservation => curObservation.category.coding[0].code === 'GlucoseEvent')
        );
      })
    
      zip(patient$, hasAlert$)  // combine values from both asyncs
        .map(([patient, hasAlert]) => {
          return {
            username: patient.username,
            id: patient.id,
            hasAlert,
            settings: "Settings"
          };
        })
        .toArray()
        .subscribe(rows => {
          this.rows = rows;
          this.table.recalculatePages();
        }); 
    }
    

    测试应答片段的Rx逻辑。

    console.clear();
    const { zip, from, of } = Rx.Observable;
    /* in Angular with Rxjs v5.5, use 
      import { zip } from 'rxjs/observable/zip';
      import { from } from 'rxjs/observable/of';
      import { of } from 'rxjs/observable/from';
    */
    
    // Simulate other observables
    const storeSelectFromPatientsGetAll$ = () =>
      of([{id: 1, username: 'Fred'}, {id: 2, username: 'Joan'}]);
    const dataServiceFetchItems$ = (type, something, id) =>
      of({ category: { coding: [{code: 'GlucoseEvent'}] }})
    
    // Testing the ngOnInit code
    const patient$ = storeSelectFromPatientsGetAll$()
      .mergeMap(value => value);
    
    const hasAlert$ = patient$.concatMap(patient => {
      return dataServiceFetchItems$('Observation', null, patient.id)
        .map(curObservation => curObservation.category.coding[0].code === 'GlucoseEvent');
    });
    
    zip(patient$, hasAlert$)  // wait for resolution of both asyncs
      .map(([patient, hasAlert]) => {
        return {
          username: patient.username,
          id: patient.id,
          hasAlert,
          settings: 'Settings'
        };
      })
      .toArray()
      .subscribe(rows => {
        console.log(rows);
      });
    <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.8/Rx.js"></script>
        2
  •  1
  •   Picci    6 年前

    您正朝着正确的方向看 mergeMap ,但您需要重新编写一点代码。

    你从 this.patients$ 这是可以观察到的。

    从这里,您需要创建一个可观察的流。这可以通过静态方法完成 from 可观察的。

    现在,您可以管理每一位患者,并让他/她 curObservation 通过服务 fetchItems

    最终,您将重新创建阵列,然后订阅。

    最终的结果可能是这样的

        ngOnInit(): void {
                this.store.dispatch(new patients.Load([]));
                this.patients$ = this.store.select(fromPatients.getAll);
    
                this.patients$.map(p =>{ //  patients$: Observable<Patient[]>;
                  this.rows = p.map(pat => { //I use this on the front end
                    return {
                      username: pat.username,
                      id: pat.id,
                      hasAlert: false, //set the default value
                      settings: "Settings"
                    };
                  });
                })
                .do(data => consoel.log(data))  // check  whether data is an array of Patients as it is supposed to be
                .switchMap(p => Observable.from(p)) // creates a stream of Observable<Patient>
                .do(data => console.log(data))  // check whether data is a single Patient
                .mergeMap(patient => this.dataService.fetchItems<Observation>(
                    "Observation",
                    null,
                    patient.id// patient should be your guy          
                  )
                  .map(curObservation => {
                    if (curObservation.category.coding[0].code == "GlucoseEvent"){ 
                      shouldAlert$ = true;
                      console.log("should alert$", shouldAlert$);
                    }
                  })
                )
               .toArray()
               .subscribe(
                  patients => {
                      this.table.recalculatePages();
                      console.log(this.rows);
                  }
               )
            }
    

    更新-基本机制

    如果我们删除了您案例的所有细节,那么上面代码段中实现的基本机制如下

    import {Observable} from 'rxjs';
    
    const obs1 = Observable.of([1, 2, 3, 4, 5, 6]);
    
    obs1
    .switchMap(n => Observable.from(n))
    .mergeMap(n => Observable.of(n*2))
    .toArray()
    .subscribe(console.log)
    
        3
  •  0
  •   Frank Visaggio    6 年前

    这就是为什么我能够通过一种不同的方法让这种逻辑为我工作。我需要浏览终极angular ngrx视频和一些rxjs教程,以便更好地理解我在其他两个答案中的错误所在,因为提供的示例效果很好。

    下面对我有效的方法

    • 在hasAlerts方法中处理来自数据服务的观察结果时使用过滤器,并将符合条件的任何观察结果添加到该方法中并返回。

    • 将hasAlerts属性设置为false,然后为给定患者调用hasAlerts()方法,修改行上的该属性,然后返回行。

      ngOnInit(): void {
      this.store.dispatch(new patients.Load([]));
      this.patients$ = this.store.select(fromPatients.getAll);
      
      this.patients$.map(p =>{ //  patients$: Observable<Patient[]>;
        this.rows = p.map(pat => { //I use this on the front end
          var rowx= {
            username: pat.username,
            id: pat.id,
            hasAlert: false, //made this an observable for the async pipe in view
            settings: "Settings"
          };
      
          this.hasAlerts(pat).do(x => {
              observations++;
              if (observations>0)
              {
                rowX.hasAlert=true;
              }
            }).subscribe();
          return rowX;
        });
      }).subscribe(
      ()=>{
      
      },
      ()=>{
        this.table.recalculatePages();
      });
      
      }
       hasAlerts(pat: Patient): Observable<Observation> {
      
        var obs$ = this.dataService.fetchItems<Observation>(
          "Observation",
          null,
          pat.id
        ).filer(function(curObservation){
           if (curObservation.category.coding[0].code == "GlucoseEvent"){ 
             return true;
           }
           else{
             return false;
           }
        });
      
      return obs$;
      }