代码之家  ›  专栏  ›  技术社区  ›  Jeremy W. Sherman

当另一个源插入管道时,如何实现调度器继承?

  •  1
  • Jeremy W. Sherman  · 技术社区  · 6 年前

    我看不出有什么方法可以做到:

    observable
    .flatMap {
      val scheduler = Schedulers().current!!
      someOtherObservable
        .observeOn(scheduler)
    }
    

    是否有其他方法可以继承调度程序?

    更多上下文

    我有这样一条管道:

    compositeDisposable += Environment
      .lookupDeviceInfo()
      .subscribeOn(scheduler)
      .flatMap { deviceInfo ->
        Device(deviceId = deviceInfo.id)
          .sendCommand()
      .subscribe(
        { result -> /*process result*/ },
        { e -> /*log error*/ })
    

    对于消费者来说,这看起来像是他们将所有的工作都推到了指定的 scheduler :事件来自 lookupDeviceInfo() 从调度程序中得到一个worker的向量,然后他们希望继续使用这个worker。

    sendCommand() 将来自另一个事件源的事件中的tee作为实现细节:

    sendMessageSingle(deviceId, payload)
    .flatMap { sentMessageId ->
      responseObservable
      .filter { it.messageId == sentMessageId }
      .firstOrError()
    }
    

    事件流从 responseObservable ,但这些事件都不会矢量化到指定的 调度程序

    1 回复  |  直到 6 年前
        1
  •  1
  •   akarnokd    6 年前

    从评论来看:

    返回到同一个调度程序线程需要提供一个单线程调度程序(即。, Schedulers.from(Executor) , Schedulers.single() observeOn .

    我不关心登陆到同一个线程,只是同一个调度程序(即使更换工人也可以,只要新工人和旧工人是由同一个调度员销售的。)

    那么这个建议仍然适用,您可以放弃我提到的“单线程”属性。