代码之家  ›  专栏  ›  技术社区  ›  Nicolas Gehlert

Observable将多个函数调用合并为单个Observable

  •  1
  • Nicolas Gehlert  · 技术社区  · 6 年前

    我有一个基于参数执行http请求的函数。我想添加一些“去抖动”功能。因此,如果函数在一个设置的时间窗口中被多次调用,我希望将参数组合成一个请求,而不是发出多个请求。

    现在,让我们跳过单个请求中的组合,因为这可以通过聚合的debounce或Oberservable.buffer. 我很难将单一的观察值组合起来。

    我试着用一个主语,因为这似乎是本案的恰当宾语( https://stackblitz.com/edit/angular-hcn41v?file=src%2Fapp%2Fapp.component.ts ).

    constructor(private http: HttpClient) {
      this.makeRequest('1').subscribe(x => console.log(x))
      this.makeRequest('2').subscribe(console.log)
      setTimeout(() => {
        this.makeRequest('3').subscribe(console.log)
      }, 1000)
    }
    
    private makeRequest(id: string) {
      this.observable = this.observable.pipe(
        merge(Observable.of(id).pipe(delay(1)))
      )
      return this.aggregateDebounce(this.observable)
    }
    
    private getUrl(value) {
      console.log('getUrl Call', value);
      return 'https://jsonplaceholder.typicode.com/posts/1';
    }
    
    private aggregateDebounce(ob$) {
      const shared$ = ob$.publishReplay(1).refCount()
      return shared$.buffer(shared$.debounceTime(75))
    }
    

    我希望每个函数调用都有一个“getUrl Call”日志和一个结果日志。但是,我只得到结果,如果我添加1个以上的调用这个.makeRequest()结果也很奇怪。所有以前的值也总是返回。我想我不太明白这个主题在这种情况下是如何工作的。

    另一种方法(从这里开始 RXJS: Aggregated debounce )是为了创造某种整体的平衡( https://stackblitz.com/edit/angular-mx232d?file=src/app/app.component.ts

    构造函数(私有http:HttpClient){
    这个.makeRequest('2')。订阅(控制台.log)
    这个.makeRequest('3')。订阅(控制台.log)
    
    私有makeRequest(id:string){
    这是可观察的= 这个可观察的管道(
    返回这个。聚合反弹(这是可观察的)
    
    私有getUrl(值){
    控制台.log('getUrl Call',值);
    返回'https://jsonplaceholder.typicode.com/posts/1';
    }
    
    私人聚集反弹(ob$){
    const shared$=ob$.publishReplay(1).refCount()
    返回shared$.buffer(shared$.debounceTime(75))
    }
    

    理论上(至少对我来说)这两种变体听起来都是合理的,但似乎我遗漏了一些东西。任何朝正确方向眨眼都是高度赞赏的。

    编辑:

    按照要求,我添加了最终的现实世界目标。

    设想一个从API请求信息的服务。在50-75毫秒内,你用一个特定的id调用服务。我想把这些id组合成一个单独的请求,而不是3。如果100毫秒后再打一个电话到服务,一个新的请求将被执行

    2 回复  |  直到 6 年前
        1
  •  1
  •   a better oliver    6 年前
    this.makeRequest(1).subscribe();
    
    private makeRequest(number: number) {
      this.values.next(number);
      return this.idObservable.pipe(
    

    在订阅之前发出该值->该值将丢失。

    private values: Subject = new Subject();
    private idObservable = this.values.pipe(
    
    private makeRequest(number: number) {
      this.values.next(number);
      return this.idObservable.pipe(    
    

    每个调用都会根据主题创建一个新的可观察对象。每当你发出一个值, 全部的

    一个可能的解决方案如下(我在这里使用新的rxjs语法):

    subject: Subject<String> = null;
    observable = null;
    window = 100;
    
    constructor() {
      this.subject = null;
      this.window = 100;
    
      this.makeRequest('1').subscribe(console.log)
      this.makeRequest('2').subscribe(console.log)
      setTimeout(() => {
        this.makeRequest('3').subscribe(console.log)
      }, 1000)
    }
    
    private makeRequest(id: string) {
      if (!this.subject) {
        this.subject = new ReplaySubject()
        this.observable = this.subject.pipe(
          takeUntil(timer(this.window).pipe(take(1))),
          reduce((url, id, index) => this.combine(url, id), baseUrl),
          flatMap(url => this.request(url)),
          tap(() => this.subject = null),
          share()
        )
      }      
    
      this.subject.next(id);
      return this.observable;
    }  
    

    在哪里? combine request 发出实际的请求。

        2
  •  1
  •   DNJohnson    5 年前

    Rxjs非常擅长处理此类案件。你需要两个不同的科目:

    1. 第二个将用于订阅结果

    当发出请求时,值将被推送到第一个主题上,但第二个主题将被返回,从而抽象出组合请求的内部逻辑。

    private values: Subject = new Subject();
    private results: Subject = new Subject();
    
    private makeRequest(number: number) {
      this.values.next(number);
      return this.results;
    }
    

    合并请求的管道可以是 buffer debounceTime 如问题或其他逻辑所示,按要求。收到回复后,只需将其推到结果主题上:

    constructor(private http: HttpClient) {
      this.values
        .pipe(
          buffer(this.values.pipe(debounceTime(1000))),
          switchMap(values => this.getUrl(values)),
          map(response => this.results.next(response)))
        .subscribe();
    }
    

    我用了 switchMap 在将响应推送到结果之前模拟异步请求。

    完整示例如下: https://angular-8yyvku.stackblitz.io