代码之家  ›  专栏  ›  技术社区  ›  Iheb Mar

卡夫卡制作人/消费者粉碎每一秒的API调用

  •  0
  • Iheb Mar  · 技术社区  · 2 年前

    每次我调用第二个API时,我都会在Postman中看到一个错误,说“有一个内部服务器错误” 我不明白这个问题是否与我的卡夫卡制作人或消费者有关,他们昨天都工作得很好。这些消息不再到达消费者手中,我不能再进行第二次API调用,因为代码每秒都会被压碎(没有在Scala中提供任何日志) 这是我的制作人代码:

    
        class Producer(topic: String, brokers: String) {
        
          val producer = new KafkaProducer[String, String](configuration)
        
          private def configuration: Properties = {
            val props = new Properties()
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
            props.put(ProducerConfig.ACKS_CONFIG, "all")
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getCanonicalName)
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getCanonicalName)
            props
          }
        
          def sendMessages(message: String): Unit = {
            val record = new ProducerRecord[String, String](topic, "1", message)
            producer.send(record)
            producer.close()
          }
        
        }
    
    

    这就是我使用它的地方:

    
        object Message extends DefaultJsonProtocol with SprayJsonSupport {
        
          val newConversation = new Producer(brokers = KAFKA_BROKER, topic = "topic_2")
        
          def sendMessage(sender_id: String, receiver_id: String, content: String): String = {
            val JsonMessage = Map("sender_id" -> sender_id, "receiver_id" -> receiver_id, "content" -> content)
            val i = JsonMessage.toJson.prettyPrint
            newConversation.sendMessages(i)
            "Message Sent"
          }
        }
    
    

    以下是API:

    f
    
        inal case class Message(sender_id: String, receiver_id: String, content: String)
        
        object producerRoute extends DefaultJsonProtocol with SprayJsonSupport {
          implicit val MessageFormat = jsonFormat3(Message)
        
          val sendMessageRoute:Route = (post & path("send")){
            entity(as[Message]){
              msg => {
                complete(sendMessage(msg.sender_id,msg.receiver_id,msg.content))
              }
              }
          }
        }
    
    

    另一方面,这是我的消费者代码:

    
        class Consumer(brokers: String, topic: String, groupId: String) {
        
          val consumer = new KafkaConsumer[String, String](configuration)
          consumer.subscribe(util.Arrays.asList(topic))
        
          private def configuration: Properties = {
            val props = new Properties()
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getCanonicalName)
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getCanonicalName)
            props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
            props
          }
        
          def receiveMessages():Array[String]  = {
          val a:ArrayBuffer[String] = new ArrayBuffer[String]
            while (true) {
              val records = consumer.poll(Duration.ofSeconds(0))
              records.forEach(record => a.addOne(record.value()))
            }
            println(a.toArray)
            a.toArray
          }
        }
        
        object Consumer extends App {
        
          val consumer = new Consumer(brokers = KAFKA_BROKER, topic = "topic_2", groupId = "test")
          consumer.receiveMessages()
        }
    
    

    我甚至不再从消费者的打印中得到结果。我不明白问题出在哪里,因为它之前运行得很好,而且自上次运行以来,我没有做任何改变。

    0 回复  |  直到 2 年前