代码之家  ›  专栏  ›  技术社区  ›  Jim C

RxJava:推荐的同步保存和检索NoSql DB数据的方法是什么(即响应式方法)

  •  0
  • Jim C  · 技术社区  · 3 年前

    上下文:这是我第一次使用RxJava。

    当使用RxJava进行编码以(1)从NoSql数据库中选择文档和(2)插入到NoSql(例如MongoDb)时,考虑到反应式堆栈,建议使用哪种方法?

    例如,我通常应该更喜欢使用Flowable进行阅读,使用Single进行保存吗?

    这段代码可以很好地将从Kafka Topic收到的消息保存到MongoDb,但我想知道io.reactivex是否有效。单身真的是实现这一目标的最佳方式。

    import com.mongodb.client.result.InsertOneResult
    import com.mongodb.reactivestreams.client.MongoClient
    import com.mongodb.reactivestreams.client.MongoCollection
    import io.micronaut.configuration.kafka.annotation.KafkaKey
    import io.micronaut.configuration.kafka.annotation.KafkaListener
    import io.micronaut.configuration.kafka.annotation.OffsetReset
    import io.micronaut.configuration.kafka.annotation.Topic
    import io.micronaut.messaging.annotation.Body
    import io.reactivex.Observable
    import io.reactivex.Single
    import javax.inject.Inject
    import io.reactivex.functions.Function
    
    @KafkaListener(offsetReset = OffsetReset.EARLIEST)
    class DebitConsumer {
    
        @Inject
        lateinit var mongoClient: MongoClient
    
        @Topic("debit")
        fun receive(@KafkaKey key: String, name: String) {
    
            save(key.toInt(), name)
    
        }
    
        private fun save( id: Int?,name: String?) {
            val debitMessage =  DebitMessage(id, name)
            Single
                    .fromPublisher(getCollection().insertOne(debitMessage))
                    .map<DebitMessage>(Function<InsertOneResult, DebitMessage> { debitMessage })
                    .subscribe()
        }
    
        private fun getCollection(): MongoCollection<DebitMessage?> {
            return mongoClient
                    .getDatabase("mydb")
                    .getCollection("mycollection", DebitMessage::class.java)
        }
    }
    

    我来自Spring Data,这是一个在响应式世界中有点生硬的CRUD,由于这个问题没有相关原因,我不会使用Spring,我正在寻找在响应式/无阻塞/背压世界中写入/读取数据的最佳实践。

    0 回复  |  直到 3 年前
        1
  •  1
  •   Carson Holzheimer    3 年前

    你的代码看起来不错。A. Single 节省是有意义的,因为你只会得到一个结果。 Flowable 阅读是有意义的,但实际上选择取决于你和你的应用程序。您想通过以下方式监听数据库更改吗 Change Streams ? 那么,你将不得不使用 可流动 因此,您可以对流中的多个更新做出反应。使用 可流动 ,即使您目前没有听到多个更新,但您认为将来可能会听到。

    如果你确定你只想处理一个事件,请使用 单身 。这将为您的应用程序代码处理多个事件的可能性节省一些精力。