代码之家  ›  专栏  ›  技术社区  ›  Dimitri

关于RxJava和PublishSubject的初学者问题

  •  0
  • Dimitri  · 技术社区  · 5 年前

    我有个问题 PublishSubject 在里面 RxJava 我创建了一个虚拟PublishSubject,它会发射一些物体。这是我的密码:

    override fun generate(exportRequest: ExportRequest): Observable<Report> {
            val faker = Faker()
            val dummyPublisher = PublishSubject.create<Report>()
            for(x in 1..1_000){
                val dataToExport = DataToExport(UUID.randomUUID(), faker.company().buzzword(), faker.company().name())
                val report = Report(dataToExport)
                sddPublisher.onNext(report)
                Thread.sleep(1)
            }
            dummyPublisher.onComplete()
            return dummyPublisher
        }
    

    ... // somewhere in the code
    reportStrategy.generate(exportRequest).subscribe { report: Report? ->
         println(report)
     }
    

    也许我错过了什么。任何帮助都将不胜感激

    0 回复  |  直到 5 年前
        1
  •  0
  •   Mark S    4 年前

    正如@akarnokd在评论中所指出的 PublishSubject 您创建的对象会立即发出由其自身传递给它的任何值 onNext

    你似乎想要的是一个 Observable 一旦有东西订阅了它,它就开始执行一些同步代码。 Observable.create 是创建这样一个实例的一种方法,但正确使用它可能会很麻烦。

    Observable.fromPublisher 1.需要一段时间 Publisher 作为论据。A. 出版商 本身是一个传递 Subscriber 每当 Observer 可观察 创建人 fromPublisher 并允许您将事件直接发送到 .

    fun generateReportStream(genFakeReport: () -> Report): Observable<Report> {
        return Observable.fromPublisher { subscriber ->
            for (x in 1..1_000) {
                val fakeReport = genFakeReport()
                subscriber.onNext(fakeReport)
                Thread.sleep(1)
            }
            subscriber.onComplete()
        }
    }
    
    fun main() {
        /** supply whatever logic you want to generate a fake [Report] */
        fun genFakeReport(): Report = TODO()
        val subscription = generateReportStream(::genFakeReport).subscribe(::println)
    }
    

    一旦订阅了,这将正确地发出值 可观察 generateReportStream 。此外,可以对同一实例进行更多订阅,并且每个订阅将使用相同的逻辑发出新的序列值。