代码之家  ›  专栏  ›  技术社区  ›  Mangat Rai Modi

垂直。x反应式卡夫卡客户端:编写时链接不起作用?

  •  0
  • Mangat Rai Modi  · 技术社区  · 6 年前

    我正在使用 io.vertx.reactivex.kafka.client.producer.KafkaProducer 客户客户端具有 rxWrite 返回的函数 Single<RecordMetadata> 。但是,我需要在写入操作期间记录错误(如果有)。它显然没有被执行。

    我已经编写了以下工作示例。

    测试(): 用于测试链接和日志记录的函数

        fun test(): Single<Int> {
        val data = Single.just(ArrayList<String>().apply {
            add("Hello")
            add("World")
        })
    
        data.flattenAsObservable<String> { list -> list }
            .flatMap { advertiser ->
           //does not work with writeKafka
                writeError(advertiser).toObservable().doOnError({ println("Error $data") })
            }
            .subscribe({ record -> println(record) }, { e -> println("Error2 $e") })
    
        return data.map { it.size }
    }
    

    书写卡夫卡: 将给定字符串写入Kafka并返回Single

    fun writeKafka(param: String): Single<RecordMetadata> {
     //null topic to produce IllegalArgumentException()
        val record = KafkaProducerRecord.create(null, UUID.randomUUID().toString(), param)
        return kafkaProducer.rxWrite(record)
    }
    

    写入错误: 始终返回具有相同类型错误的单个

    fun writeError(param: String): Single<RecordMetadata> {
        return Single.error<RecordMetadata>(IllegalArgumentException())
    }
    

    所以当我打电话的时候 writeKafka 它只打印 Error2 但如果我使用 writeError 它同时打印两个 Error 错误2 。看起来像是 writeKafka公司 仍在等待结果,但为什么 错误2 是否打印?

    我是RxJava2的新手,有人能指出其中的错误吗?

    1 回复  |  直到 6 年前
        1
  •  2
  •   akarnokd    6 年前

    读取并发布错误的堆栈跟踪非常重要,这样可以隔离问题。

    在这种情况下,看起来你得到了 IllegalArgumentException 从…起 create 你什么都没有 Single 因为相关的卡夫卡课程 throws it return kafkaProducer.rxWrite(record) 根本不执行,实际上会使 flatMap doOnError 从不进场,因此只打印“Error2”。