代码之家  ›  专栏  ›  技术社区  ›  guilhebl

Scala使用多个web服务和合并输出的惯用且最快的方法

  •  1
  • guilhebl  · 技术社区  · 6 年前

    我有一个Scala应用程序,它使用多个数据源并合并合并输出,该应用程序并行调用多个HTTP调用并等待它们的结果,当所有问题都得到解决时,它将它们的输出合并到一个result对象中,并将响应作为Scala Future发送回。代码:

    val providers = getProviders()
    val futures = for (p <- providers) yield searchSource(p, req)
    val result = waitAll(futures)
    val emptyList = Option(ProductList(Vector.empty, ListSummary(0, 0, 0)))
    
    result.map { x =>
      x.foldLeft(emptyList)((r, c) => {
        if (c.isSuccess) ProductList.merge(r, c.get) else r
      })
    }
    

    它首先获取提供程序字符串的列表,然后在searchSource中并行地触发对每个提供程序的调用。

    Future[Option[ProductList]]

      protected def waitAll[T](futures: Seq[Future[T]])(implicit ec: CustomExecutionContext): Future[Seq[Try[T]]] =
        Future.sequence(lift(futures))
    
      protected def lift[T](futures: Seq[Future[T]])(implicit ec: CustomExecutionContext): Seq[Future[Try[T]]] =
        futures.map(_.map { Success(_) }.recover { case t => Failure(t) })
    

    有没有解决这类问题的“官方”的一元/惯用方法,或者这种类型的用例是否有性能优化的例子?

    1 回复  |  直到 6 年前
        1
  •  0
  •   Lasf    6 年前

    cats-effect IO 而不是 Future

    def searchSource(provider: String, request: Request[_]): IO[Option[ProductList]] = ???
    
    val result: IO[List[Option[ProductList]]] = 
      providers.traverse[IO, Option[ProductList]](p => searchSource(p, req))
    

    请注意 traverse 本质上与 map 然后 sequence . 然后你可以在你的 result 程序(例如。 地图 把它变成你想要的最终结果。在课程结束时,您可以执行以下操作:

    result.attempt.unsafeRunSync() //Either[Throwable, List[Option[ProductList]]]
    

    handleErrorWith 方法。您可以在 searchSource 水平,因为他们都是 IO