每次我调用第二个API时,我都会在Postman中看到一个错误,说“有一个内部服务器错误”
我不明白这个问题是否与我的卡夫卡制作人或消费者有关,他们昨天都工作得很好。这些消息不再到达消费者手中,我不能再进行第二次API调用,因为代码每秒都会被压碎(没有在Scala中提供任何日志)
这是我的制作人代码:
class Producer(topic: String, brokers: String) {
val producer = new KafkaProducer[String, String](configuration)
private def configuration: Properties = {
val props = new Properties()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.ACKS_CONFIG, "all")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getCanonicalName)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getCanonicalName)
props
}
def sendMessages(message: String): Unit = {
val record = new ProducerRecord[String, String](topic, "1", message)
producer.send(record)
producer.close()
}
}
这就是我使用它的地方:
object Message extends DefaultJsonProtocol with SprayJsonSupport {
val newConversation = new Producer(brokers = KAFKA_BROKER, topic = "topic_2")
def sendMessage(sender_id: String, receiver_id: String, content: String): String = {
val JsonMessage = Map("sender_id" -> sender_id, "receiver_id" -> receiver_id, "content" -> content)
val i = JsonMessage.toJson.prettyPrint
newConversation.sendMessages(i)
"Message Sent"
}
}
以下是API:
f
inal case class Message(sender_id: String, receiver_id: String, content: String)
object producerRoute extends DefaultJsonProtocol with SprayJsonSupport {
implicit val MessageFormat = jsonFormat3(Message)
val sendMessageRoute:Route = (post & path("send")){
entity(as[Message]){
msg => {
complete(sendMessage(msg.sender_id,msg.receiver_id,msg.content))
}
}
}
}
另一方面,这是我的消费者代码:
class Consumer(brokers: String, topic: String, groupId: String) {
val consumer = new KafkaConsumer[String, String](configuration)
consumer.subscribe(util.Arrays.asList(topic))
private def configuration: Properties = {
val props = new Properties()
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getCanonicalName)
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getCanonicalName)
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
props
}
def receiveMessages():Array[String] = {
val a:ArrayBuffer[String] = new ArrayBuffer[String]
while (true) {
val records = consumer.poll(Duration.ofSeconds(0))
records.forEach(record => a.addOne(record.value()))
}
println(a.toArray)
a.toArray
}
}
object Consumer extends App {
val consumer = new Consumer(brokers = KAFKA_BROKER, topic = "topic_2", groupId = "test")
consumer.receiveMessages()
}
我甚至不再从消费者的打印中得到结果。我不明白问题出在哪里,因为它之前运行得很好,而且自上次运行以来,我没有做任何改变。