代码之家  ›  专栏  ›  技术社区  ›  hackp0int

对httpclient结果进行长时间轮询并流到csv文件中

  •  5
  • hackp0int  · 技术社区  · 6 年前

    问题1: 如何实现相同的行为?而不是 Observable.interval 它将由回调调用。

    例如: 我有 5000ms 但是我的服务器速度非常慢,而且在 5000毫秒 . 但是下一个调用是在 5000毫秒 . 我不想这样。我希望在从服务器返回结果之后,它将调用下一个调用。

    问题2: 如何将结果立即流式传输到csv文件,而不逐个创建多个文件。对于当前的实现,我使用 FileSaver 其中工作 IE11 . 我想继续使用它。 是否有一种方法可以将数据流化为文件而不是将其收集到数组中,因为我有大型数据集。比如1米行等等… 例子:

    const progress = Observable.interval(1000)
      .switchMap(() => this.messageService.getResults(query))
      .map(messageResult => messageResult)
      .subscribe((data: MessagesResult) => {
        inProcess = true;
        if (!data.isMoreResults && data.auditMessageList.length === 0) {
          this.fileSaver.save(`result.csv`, csvData);
          inProcess = false;
          this.logger.info('Download file finished...');
          progress.unsubscribe();
        }
        const start = this.filterModel.offset * this.filterModel.limit;
        const rows = [...csvData];
        rows.splice(start, 0, ...data.auditMessageList);
        csvData = rows;
        if (inProcess) {
          this.logger.info('Exporting in progress...');
        }
        query.offset++;
      }, error => this.logger.error(error));
    

    }

    3 回复  |  直到 5 年前
        1
  •  2
  •   m1ch4ls    6 年前

    正如你发现的那样 Observable.interval 不会“等待”河流的其余部分。

    我一般使用 repeatWhen 具有 delay

    const progress = Observable.defer(() => this.messageService.getResults(query))
      .repeatWhen(notifications => notifications.delay(1000)) 
      ...
    

    以下是工作示例: https://jsfiddle.net/a0rz6nLv/19/

    我不太明白你们其他人的代码。

    不要使用 progress.unsubscribe(); 在里面 subscribe 方法。而是考虑使用 takeWhile takeUntil -两者都将为您完成观测。

    .takeWhile(data => data.isMoreResults  data.auditMessageList.length > 0)
    

    例如,还可以使用 reduce toArray

    .reduce((accumulator, data) => data.auditMessageList.concat(accumulator), [])
    

    副作用最好由 do 操作人员

    .do({
      next: () => {
        inProgress = true;
        this.logger.info('Exporting in progress...');
      },
      complete: () => {
        inProgress = false;
        this.logger.info('Download file finished...');
      }
    })
    

    关于第二个问题——我不知道——你应该能够从服务器传输csv。如果您不能修改服务器,也许其他人会知道如何在客户机上进行修改…

        2
  •  2
  •   madjaoue    6 年前

    问题1

    下面是一个实现函数的示例,该函数在收到响应时调用自身。

    后端:

    1. 模拟5秒和10秒内响应缓慢的后端
    2. 在每次响应时,服务器都会给出 request_number 和A state
    3. 对于3个第一反应, 状态 active 之后, 状态 closed

    代码:

    /* Mocked backend. I'm slow, like really slow */
    class SlowBackend {
      MAX_ITERATIONS = 3; // suppose you're reading a table and you have pagination, with 3 pages
      currentIteration = 0;
    
      constructor() {}
    
      getStuff() {
        console.log(`**Request N. ${this.currentIteration}**\n[Back] : received a request from the front`);
        const responseDelay = Math.random() * 5000 + 5000; // response between 5s and 10s
        let state = "open";
        if(++this.currentIteration > this.MAX_ITERATIONS)
          state = "closed";
    
        return Observable
          .timer(responseDelay)
          .map( () => {
          console.log(`[Back] : Responding after ${responseDelay} ms`)
            return {
              request_number : this.currentIteration,
              state : state
            };
    
          })
      }
    }
    

    前面:

    这基本上是您的组件。

    class Frontend {
    
      isPollingActivated = true;
      responses = [];
    
    
      constructor(private backendService) {
        this.backendService = new SlowBackend(); // connection to backend
        this.requestOnRegularBasis();
      }
    
      requestOnRegularBasis() {
        if (!this.isPollingActivated)
          return;
    
        this.backendService.getStuff()
          .subscribe(response => {
            console.log(`[Front] : received response from server. State : ${response.state}`);
    
            // Choose one of the following blocks, comment the other according to what you need
    
            // Block 1 : Sync processing example
            console.log(`[Front] : doing some sync processing`);
            this.doSomeSyncProcessing(response);
            this.requestOnRegularBasis();
    
            // Block 2 : Async processing example
            // console.log(`[Front] : doing some async processing`);
            // this.doSomeAsyncProcessing(response)
            //    .subscribe(this.requestOnRegularBasis);
    
          })
      }
    
      private doSomeSyncProcessing(response){
        if(response.state == 'closed'){
          this.isPollingActivated = false; // stop polling
          this.saveDataToCsv();
        }
        else
          this.responses.push(Object.values(response).join(';')) // csv line separated by ';'
      }
    
      private saveDataToCsv(){
        const headers = ['current_request;state']
        this.responses = headers.concat(this.responses)
        console.log('saving to csv : ', this.responses.join('\n'));
    
        // Uncomment this to use FileSaver API
        /*
        const blob = new Blob(headers.concat(this.responses), {type: "text/csv;charset=utf-8"});
        saveAs(blob, "my_responses.csv");*
        */
      }
    
      private doSomeAsyncProcessing(response){
        return Observable.timer(1000).map(() => this.doSomeSyncProcessing(response));
      }
    
    }
    

    输出:

    **Request N. 0**
    [Back] : received a request from the front
    [Back] : Responding after 5482 ms
    [Front] : received response from server. State : open
    [Front] : doing some sync processing
    **Request N. 1**
    [Back] : received a request from the front
    [Back] : Responding after 7489 ms
    [Front] : received response from server. State : open
    [Front] : doing some sync processing
    **Request N. 2**
    [Back] : received a request from the front
    [Back] : Responding after 9627 ms
    [Front] : received response from server. State : open
    [Front] : doing some sync processing
    **Request N. 3**
    [Back] : received a request from the front
    [Back] : Responding after 5806 ms
    [Front] : received response from server. State : closed
    [Front] : doing some sync processing
    saving to csv :
    current_request;state
    1;open
    2;open
    3;open
    

    问题2

    你不能。

    至少不使用 FileSaver . 因为它不支持逐块写入。当你宣布 Blob 你必须 准备好所有数据。 有些库支持块,但它们要么用于服务器端(例如node.js),要么非常特定于浏览器。

    检查一下: Save client generated data as file in JavaScript in chunks

    注:

    如果您试图使用JS在客户机中存储1百万行的csv,那么架构可能有问题。 因为这不是浏览器的常见用例。客户机应该有薄弱的机器,因此接收处理, 轻,易于分析信息。为此,您可以在服务器端构建csv,它将 拥有写入流文件的所有权利,以及适当的处理/内存容量。

    演示:问题1

    http://jsbin.com/rojutudayu/2/edit?html,js,console

    演示:如何下载blob?

        <script src="https://cdn.rawgit.com/eligrey/FileSaver.js/e9d941381475b5df8b7d7691013401e171014e89/FileSaver.min.js"> </script>
    
    <script> 
    var blob = new Blob(["Hello, world!"], {type: "text/plain;charset=utf-8"});
    saveAs(blob, "hello world.txt");
    </script>
        3
  •  2
  •   hgiasac    6 年前

    问题1:

    使用 forkJoin . 它将等待所有可观测数据完成。 当你与 delay(5000) ,最短时间为5s,如果5s前没有返回api响应,则仍要等待结果返回。( demo )

    const stream1$ = of(1).pipe(
      delay(5000)
    );
    
    const intervalTime = Math.random() * 5000 + 5000
    
    // replace with your API stream
    const stream2$ = of(intervalTime).pipe(
      delay(intervalTime)
    );
    
    forkJoin(stream1$, stream2$)
      .subscribe(([_, s2]) => {
        console.log(s2);
      })
    

    问题2:

    如果文件很大,您应该让Web浏览器处理它。最好将文件保存在服务器中,然后返回一个链接来下载它。对于小文件,性能不是问题。您可以将文件数据存储在RAM中,然后保存文件一次。

    编辑:filesaver开发人员建议使用 StreamSaver 如果文件很大。你应该看看

    streamsaver.js采用了不同的方法。现在您可以直接在文件系统中创建一个可写流,而不是将数据保存在客户端存储或内存中(我不是说Chromes沙盒文件系统)。

    streamsaver.js是在客户端保存流的解决方案。它非常适合需要保存大量在客户端创建的数据的webapps,在客户端RAM非常有限,比如在移动设备上。