我正在使用
io.vertx.reactivex.kafka.client.producer.KafkaProducer
客户客户端具有
rxWrite
返回的函数
Single<RecordMetadata>
。但是,我需要在写入操作期间记录错误(如果有)。它显然没有被执行。
我已经编写了以下工作示例。
测试():
用于测试链接和日志记录的函数
fun test(): Single<Int> {
val data = Single.just(ArrayList<String>().apply {
add("Hello")
add("World")
})
data.flattenAsObservable<String> { list -> list }
.flatMap { advertiser ->
//does not work with writeKafka
writeError(advertiser).toObservable().doOnError({ println("Error $data") })
}
.subscribe({ record -> println(record) }, { e -> println("Error2 $e") })
return data.map { it.size }
}
书写卡夫卡:
将给定字符串写入Kafka并返回Single
fun writeKafka(param: String): Single<RecordMetadata> {
//null topic to produce IllegalArgumentException()
val record = KafkaProducerRecord.create(null, UUID.randomUUID().toString(), param)
return kafkaProducer.rxWrite(record)
}
写入错误:
始终返回具有相同类型错误的单个
fun writeError(param: String): Single<RecordMetadata> {
return Single.error<RecordMetadata>(IllegalArgumentException())
}
所以当我打电话的时候
writeKafka
它只打印
Error2
但如果我使用
writeError
它同时打印两个
Error
和
错误2
。看起来像是
writeKafka公司
仍在等待结果,但为什么
错误2
是否打印?
我是RxJava2的新手,有人能指出其中的错误吗?