代码之家  ›  专栏  ›  技术社区  ›  Magnus

在rxjs中,观察者是否被注入到可观察的执行中?

  •  4
  • Magnus  · 技术社区  · 6 年前

    我已经看完了 ReactiveX 记录了好几次,但仍然不能完全理解 观察者 订阅 可观察的 .

    让我们看一个简单的例子:

    import { Observable } from 'rxjs'; 
    
    const observable = new Observable(subscriber => {
      subscriber.next(1);
      subscriber.complete();
    });
    
    const observer = {
      next: (x) => console.log('got value ' + x),
      error: (err) => console.error('something wrong occurred: ' + err),
      complete: () => console.log('done')
    };
    
    observable.subscribe(observer);
    

    StackBlitz code .


    我的问题:

    在哪里 subscriber 传递给 可观察的 来自何方?

    RxJS documentation :

    不是巧合 observable.subscribe subscribe 在里面 new Observable(function subscribe(subscriber) {...}) 有相同的名字。 在图书馆,它们是不同的,但是为了实际的目的你可以 从概念上看它们是平等的。

    所以,显然这个物体 订阅 中的回调 可观察的 构造器( 用户 事实上 observer 对象。至少,如果你按照上面的引用来解释图书馆的实际工作方式,你就不会这么做了。

    如果不是 观察者 传入的对象,那么到底是什么 subscriber.next(1) subscribe.complete() 打电话?这和 next 中的属性 观察者 ?


    澄清编辑:

    我知道如何利用 RXJS 事实上我们可以 概念上 想象一下 观察者 注入(正如引用的那样)。不过,我来这里是想了解 事实上 作品。

    2 回复  |  直到 6 年前
        1
  •  3
  •   Magnus    6 年前

    这个 Observable 创建流程如下:

    可观察的 由作者定义(此处手动使用 new ,为了解释的目的):

    const myObservable = new Observable(function subscribe(subscriber) {
      subscriber.next(1);
      subscriber.next(2);
      subscriber.complete();
      return function tearDownLogic() {
        console.log('runs when Observable for whatever reason is done (complete, error, or unsubscribed)')
      }
    });
    

    这个 subscribe 回调传递给 可观察的 上面的内容由 Observable constructor :

    constructor(subscribe?: (this: Observable<T>, subscriber: Subscriber<T>) => TeardownLogic) {
      if (subscribe) {
        this._subscribe = subscribe;
      }
    }
    

    所以,我们有完整的 订阅 功能,由我们或任何其他预先制定的 可观察的 ,保存下来供以后执行。

    观察者 可以传递给 订阅 以多种形式之一进行回调。或者,直接作为一到三个函数( 下一个 , 错误 , 完成 ,或作为具有相同三种方法中的一种或多种方法的对象。在本说明中,我们将实现最后一个更详细的选项:

    const observer = {
      next(v) {
        console.log(v);
      }
      error(err) {
        console.log(err);
      }
      complete() {
        console.log('Observable has now completed and can no longer emit values to observer');
      }
    }
    

    现在,有趣的部分开始了。我们通过 observer 进入 Observable.subscribe(..) 方法:

    myObserver.subscribe(observer);
    

    这个 subscribe method 看起来像这样:

      subscribe(observerOrNext?: PartialObserver<T> | ((value: T) => void),
                error?: (error: any) => void,
                complete?: () => void): Subscription {
    
    
        const { operator } = this;
        const sink = toSubscriber(observerOrNext, error, complete);
    
    
        if (operator) {
          sink.add(operator.call(sink, this.source));
        } else {
          sink.add(
            this.source || (config.useDeprecatedSynchronousErrorHandling && !sink.syncErrorThrowable) ?
            this._subscribe(sink) :
            this._trySubscribe(sink)
          );
        }
    
    
        if (config.useDeprecatedSynchronousErrorHandling) {
          if (sink.syncErrorThrowable) {
            sink.syncErrorThrowable = false;
            if (sink.syncErrorThrown) {
              throw sink.syncErrorValue;
            }
          }
        }
    
    
        return sink;
      }
    

    简单描述一下 订阅 方法:

    1. 收到 观察者 在之前讨论过的形式中
    2. toSubscriber 将观察者转换为 Subscriber 对象,不管它以何种形式传递 用户 实例保存在 sink 变量)
    3. 注: operator 变量是 undefined ,除非您订阅了运营商。因此,忽略 if 周围的陈述 操作人员
    4. 用户 扩展(原型链接到) Subscription 对象,其原型上有两个重要方法: unsubscribe() , add()
    5. add(..) 用于添加“ 拆卸逻辑 “(职能)至 可观察的 ,当 可观察的 完成 或是 退订 . 它将接受传递给它的任何函数,并用 订阅 对象,并将函数放入 订阅 _unsubscribe 变量。这个 订阅 保存在 用户 我们在上面创建了一个变量 _subscriptions . 如前所述,我们这样做是为了 用户 退订 完成 ,所有的 加法() 埃德 拆卸逻辑 执行
    6. 作为旁注, Observable.subscribe() 返回 用户 实例。所以,你可以打电话 mySubscriber.add( // some tear down logic) 在任何时候添加函数,当 可观察的 完成 或是 退订
    7. 一个重要的部分现在包括: this._trySubscribe(sink) 跑(内) 加法() ,作为参数)。 _trySubscribe(..) 是实际运行 订阅 之前由保存的回调 可观察的 构造器。重要的是,它通过了 下沉 (我们的新 用户 实例)作为对 可观察的 回调。换句话说,当 subscriber.next(1) 里面 可观察的 执行,我们实际上在执行 next(1) 下沉 ( 用户 )实例 next() 在上 用户 的原型)。

    所以,现在我要走到最后。里面还有更多细节 toSubscribe 在附近 退订 流程等,但这些不在本问答范围内。

    简而言之,要回答标题中的问题, 观察者 确实被传给了 可观察的 ,仅仅是在转换为统一的 用户 对象。

    希望这对将来的其他人有帮助。

        2
  •  2
  •   Alberto Chiesa    6 年前

    不,观察者没有被注入到一个可观察对象中。

    令人困惑的是, new Observable(...) 语法与其说是一个有用的模式,不如说是一个低级的工厂。

    它或多或少是更直接的实现所使用的机器,比如 of(value1, value2, ..., valueN) , from(enumeration) fromEvent(...) .

    这些方法是您应该关注的实际用例。

    在封面下,所有这些方法都桥接某种同步或异步值或交互 进入之内 可观察到的溪流的奇妙世界。为此,他们,以某种方式,采取行动 喜欢 一个恰当的观察者:他们生成项目并将其放入流中。为此,它们使用一个名为 next . 就像 Observer 实现,因为实际上是以完全相同的方式调用的。

    特别是,您可以在这里查看subscribe方法的实现:

    https://github.com/ReactiveX/rxjs/blob/master/src/internal/Observable.ts

    如果您想了解订阅期间实际发生的情况,我建议您实际查看代码。 但是,在imo中,您应该在熟悉了各种可观察的创建函数之后再尝试。

    希望有帮助。