上下文:这是我第一次使用RxJava。
当使用RxJava进行编码以(1)从NoSql数据库中选择文档和(2)插入到NoSql(例如MongoDb)时,考虑到反应式堆栈,建议使用哪种方法?
例如,我通常应该更喜欢使用Flowable进行阅读,使用Single进行保存吗?
这段代码可以很好地将从Kafka Topic收到的消息保存到MongoDb,但我想知道io.reactivex是否有效。单身真的是实现这一目标的最佳方式。
import com.mongodb.client.result.InsertOneResult
import com.mongodb.reactivestreams.client.MongoClient
import com.mongodb.reactivestreams.client.MongoCollection
import io.micronaut.configuration.kafka.annotation.KafkaKey
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.OffsetReset
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.messaging.annotation.Body
import io.reactivex.Observable
import io.reactivex.Single
import javax.inject.Inject
import io.reactivex.functions.Function
@KafkaListener(offsetReset = OffsetReset.EARLIEST)
class DebitConsumer {
@Inject
lateinit var mongoClient: MongoClient
@Topic("debit")
fun receive(@KafkaKey key: String, name: String) {
save(key.toInt(), name)
}
private fun save( id: Int?,name: String?) {
val debitMessage = DebitMessage(id, name)
Single
.fromPublisher(getCollection().insertOne(debitMessage))
.map<DebitMessage>(Function<InsertOneResult, DebitMessage> { debitMessage })
.subscribe()
}
private fun getCollection(): MongoCollection<DebitMessage?> {
return mongoClient
.getDatabase("mydb")
.getCollection("mycollection", DebitMessage::class.java)
}
}
我来自Spring Data,这是一个在响应式世界中有点生硬的CRUD,由于这个问题没有相关原因,我不会使用Spring,我正在寻找在响应式/无阻塞/背压世界中写入/读取数据的最佳实践。