代码之家  ›  专栏  ›  技术社区  ›  Saturn K

在Angular/RxJs中,如何捆绑http请求,以便一个捆绑包包含多个请求(http://test/1、http://test/2等)

  •  2
  • Saturn K  · 技术社区  · 6 年前

    我在看 bufferTime rxjs操作符,本质上希望使用它来捆绑http请求( this.http.get<string>('http://test/?id={num}')) . {num}是1-4。我想要的是每2秒钟把这些请求打包。因此,如果在2秒内发出2个请求,它们看起来像:

    this.http.get<string>('http://testUrl?id=1') this.http.get<string>('http://testUrl?id=2') 它们将作为1个请求发送到服务器(我指的是绑定)。服务器将接收 'http://testUrl?id=1,2'

    谢谢你的阅读!

    2 回复  |  直到 6 年前
        1
  •  0
  •   CozyAzure    6 年前

    forkJoin 允许您并行启动请求,从而“捆绑”您的请求。

    首先用你的延迟创建一个可观测的数组:

    let obs = Array
        .from({length: 100}, (v, k) => k + 1)
        .map(x =>
            this.http.get<string>('http://test/{x}')
                .pipe(concatMap(item => of(item.pipe(delay(2000)))))
        );
    

    那就用 叉接 解雇他们:

    forkJoin(obs)
        .subscribe(x=>{
            console.log(x)//an array of response from HTTP
        })
    

    上面的代码只在 2*(100-1) =198秒

        2
  •  0
  •   Saturn K    6 年前

    所以我修改了代码。现在,这两个请求主体通过一个数组合并为一个主体。每3秒发送一组请求主体以及url:

    import { HttpClient } from '@angular/common/http';
    import { Injectable } from '@angular/core';
    import { Subject, Subscriber, Observable } from 'rxjs';
    import { debounceTime } from 'rxjs/operators/debounceTime';
    import { buffer } from 'rxjs/operators/buffer';
    
    interface ReportRunStatus {
        status: any;
    }
    
    interface RequestWithObserver {
        request: number;
        observer: Subscriber<ReportRunStatus>;
    }
    
    const DEBOUNCE_TIME = 3000;
    
    @Injectable()
    export class WelcomeService {
        private requests = new Subject<RequestWithObserver>();
        private bufferTrigger = new Subject<null>();
        private requestBuffer = this.requests.pipe(buffer(this.bufferTrigger.pipe(debounceTime(DEBOUNCE_TIME))));
    
        constructor(
            private http: HttpClient
        ) {
            this.subscribeBufferRequests();
        }
    
        public getConfig(id: number) {
            return new Observable((observer) => {
                this.bufferTrigger.next(null);
    
                this.requests.next({
                    request: id,
                    observer
                });
            });
        }
    
        public subscribeBufferRequests() {
            this.requestBuffer.subscribe((requests) => {
                const requestsData = requests.map((r) => r.request);
                this.http.post('getMovie', requestsData).subscribe((response) => {
                    requests.forEach((requestItem, i) => {
                        requestItem.observer.next(response[i]);
                    });
                });
            });
        }
    }