经过仔细考虑,我使用了
主要观察值与
操作。
本例中为http请求,但文件迭代示例类似。
可以通过函数参数添加第二个观察者/订阅者。此订阅者只关心
第二个版本的功函数,没有进度观测器,
export class FileUploadService {
doWork(formData: FormData, url: string): Subject<Response> {
return this.privateDoWork(formData, url, null);
}
doWorkWithProgress(formData: FormData, url: string, progressObserver: Observer<number>): Subject<Response> {
return this.privateDoWork(formData, url, progressObserver);
}
private privateDoWork(formData: FormData, url: string, progressObserver: Observer<number> | null): Subject<Response> {
return Observable.create(resultObserver => {
let xhr: XMLHttpRequest = new XMLHttpRequest();
xhr.open("POST", url);
xhr.onload = (evt) => {
if (progressObserver) {
progressObserver.next(1);
progressObserver.complete();
}
resultObserver.next((<any>evt.target).response);
resultObserver.complete()
};
xhr.upload.onprogress = (evt) => {
if (progressObserver) {
progressObserver.next(evt.loaded / evt.total);
}
};
xhr.onabort = (evt) => resultObserver.error("Upload aborted by user");
xhr.onerror = (evt) => resultObserver.error("Error");
xhr.send(formData);
});
}
这里是包含进度订阅者的函数调用。使用此解决方案,上载函数的调用方必须
创建/处理/拆卸进度订阅服务器。
this.fileUploadService.doWorkWithProgress(this.chosenSerie.formData, url, new Subscriber((progress) => console.log(progress * 100)).subscribe(
(result) => console.log(result),
(error) => console.log(error),
() => console.log("request Completed")
);
总的来说,我更喜欢这个解决方案,而不是一个具有单个订阅的“Pair”对象。不需要空处理,并且
该示例是用Typescript编写的,但其他ReactiveX实现也可以使用类似的解决方案。