我从Akka Streams开始,我想构建一个服务器作为Stream来接收
Http.IncomingConnection
并将收到的信息作为普通接收器发送给卡夫卡。
我声明我的来源如下:
val serverSource: Source[Http.IncomingConnection, Future[Http.ServerBinding]] =
Http().bind(interface = "localhost", port = "8080")
然后,我想从HttpRequest的主体中提取消息(字符串),最后将字符串发送给Kafka。流程如下所示:
val bindingFuture: Future[Http.ServerBinding] = serverSource
.map(???) //Here, I need to extract the message
.map(message => new ProducerRecord[String, String](topic, message.result(2 seconds)))
.runWith(akka.kafka.scaladsl.Producer.plainSink(producerSettings))
但是,我不知道如何提取信息。我想这样做:
val requestHandler: HttpRequest => HttpResponse = {
case HttpRequest(POST, Uri.Path("/publish"), _, _, _) => {
HttpResponse(202, entity = "Message sent to Kafka!")
}
case r: HttpRequest =>
r.discardEntityBytes() // important to drain incoming HTTP Entity stream
HttpResponse(404, entity = "Unknown resource!")
}
connection handleWithSyncHandler requestHandler
我无法获取流进程所遵循的消息。同时,我也希望能收到你的任何请求
/publish
URI,或者在流内部返回404。
有可能这样做吗?