代码之家  ›  专栏  ›  技术社区  ›  Sergio Rodríguez Calvo

如何将Http管理到Akka流中并将消息发送给Kafka?

  •  1
  • Sergio Rodríguez Calvo  · 技术社区  · 6 年前

    我从Akka Streams开始,我想构建一个服务器作为Stream来接收 Http.IncomingConnection 并将收到的信息作为普通接收器发送给卡夫卡。

    我声明我的来源如下:

    val serverSource: Source[Http.IncomingConnection, Future[Http.ServerBinding]] =
        Http().bind(interface = "localhost", port = "8080")
    

    然后,我想从HttpRequest的主体中提取消息(字符串),最后将字符串发送给Kafka。流程如下所示:

    val bindingFuture: Future[Http.ServerBinding] = serverSource
         .map(???) //Here, I need to extract the message
         .map(message => new ProducerRecord[String, String](topic, message.result(2 seconds)))
         .runWith(akka.kafka.scaladsl.Producer.plainSink(producerSettings))
    

    但是,我不知道如何提取信息。我想这样做:

    val requestHandler: HttpRequest => HttpResponse = {
        case HttpRequest(POST, Uri.Path("/publish"), _, _, _) => {
          HttpResponse(202, entity = "Message sent to Kafka!")
        }
        case r: HttpRequest =>
          r.discardEntityBytes() // important to drain incoming HTTP Entity stream
          HttpResponse(404, entity = "Unknown resource!")
      }
    

    connection handleWithSyncHandler requestHandler 我无法获取流进程所遵循的消息。同时,我也希望能收到你的任何请求 /publish URI,或者在流内部返回404。

    有可能这样做吗?

    1 回复  |  直到 6 年前
        1
  •  1
  •   Ramón J Romero y Vigil    6 年前

    改用指令

    Routing DSL 使用起来比处理 HttpRequest

    val route : Route = 
      post {
        path("publish") {
          extractRequestEntity { entity =>
            onComplete(entity.toStrict(10.seconds).map(_.data.utf8String){ message =>
              Producer.plainSink(producerSettings)(
                new ProducerRecord[String, String](topic, message.result(2 seconds))
              )
              complete(StatusCodes.OK)
            } 
          }
        }
      }
    

    现在可以将其传入以处理传入的请求:

    Http().bindAndHandle(
      route,
      "localhost",
      8080
    )