代码之家  ›  专栏  ›  技术社区  ›  TardigradeX

如何使用ReactiveX观察对象/主题处理进度更新?

  •  6
  • TardigradeX  · 技术社区  · 7 年前

    我正在编写一个Angular应用程序,它使用ReactiveX API来处理异步操作。我以前在Android项目中使用过API,我非常喜欢它简化并发任务处理的方式。但有一件事我不确定如何以正确的方式解决。

    如何从正在进行的任务中更新观察者?在这种情况下,加载/创建复杂/大型对象需要时间,我可以返回中间进度,但不能返回对象本身。可观察对象只能返回一个数据类型。因此,我知道两种可能性。

    1. 创建两个可观察项,一个是数据可观察项,另一个是进度可观察项。观察者必须订阅进度更新的可观察进度,以及在最终加载/创建数据时通知的可观察数据。也可以选择将它们压缩在一起,以用于一个订阅。

    我使用了这两种技术,它们都有效,但我想知道是否有一个统一的标准,一个干净的方法,如何解决这个任务。当然,它也可以是一个全新的。我对每个解决方案都持开放态度。

    1 回复  |  直到 7 年前
        1
  •  4
  •   TardigradeX    7 年前

    经过仔细考虑,我使用了 主要观察值与 操作。 本例中为http请求,但文件迭代示例类似。

    可以通过函数参数添加第二个观察者/订阅者。此订阅者只关心

    第二个版本的功函数,没有进度观测器,

    export class FileUploadService {
    
     doWork(formData: FormData, url: string): Subject<Response> {
        return this.privateDoWork(formData, url, null);
     }
    
     doWorkWithProgress(formData: FormData, url: string, progressObserver: Observer<number>): Subject<Response> {
        return this.privateDoWork(formData, url, progressObserver);
     }
    
     private privateDoWork(formData: FormData, url: string, progressObserver: Observer<number> | null): Subject<Response> {
    
         return Observable.create(resultObserver => {
         let xhr: XMLHttpRequest = new XMLHttpRequest();
         xhr.open("POST", url);
    
         xhr.onload = (evt) => {
             if (progressObserver) {
                progressObserver.next(1);
                progressObserver.complete();
                }
             resultObserver.next((<any>evt.target).response);
             resultObserver.complete()
         };
         xhr.upload.onprogress = (evt) => {
             if (progressObserver) {
                progressObserver.next(evt.loaded / evt.total);
             }
    
         };
         xhr.onabort = (evt) => resultObserver.error("Upload aborted by user");
         xhr.onerror = (evt) => resultObserver.error("Error");
    
         xhr.send(formData);
         });
     }
    

    这里是包含进度订阅者的函数调用。使用此解决方案,上载函数的调用方必须 创建/处理/拆卸进度订阅服务器。

     this.fileUploadService.doWorkWithProgress(this.chosenSerie.formData, url, new Subscriber((progress) => console.log(progress * 100)).subscribe(
        (result) => console.log(result),
        (error) => console.log(error),
        () => console.log("request Completed")
        );
    

    总的来说,我更喜欢这个解决方案,而不是一个具有单个订阅的“Pair”对象。不需要空处理,并且

    该示例是用Typescript编写的,但其他ReactiveX实现也可以使用类似的解决方案。