代码之家  ›  专栏  ›  技术社区  ›  G.Saleh

scala Future运行顺序作业

  •  0
  • G.Saleh  · 技术社区  · 6 年前

    我尝试按顺序启动三个作业,但当我尝试以下代码时:

    val jobs = Seq("stream.Job1","stream.Job2","stream.Job3")
        Future.sequence {
              jobs.map { jobClass =>
                Future {
                  println(s"Starting the spark job from class $jobClass...")
                  % gcloud("sparkC", "jobs", "submit", "spark", s"--cluster=$clusterName", s"--class=$jobClass", "--region=global", s"--jars=$JarFile")
                  println(s"Starting the spark job from class $jobClass...DONE")
                }
              }
            }  
    

    我把这三份工作并行进行,然后依次进行。 我认为解决办法是 flatMap 但我无法实现它。
    请帮忙。

    2 回复  |  直到 6 年前
        1
  •  2
  •   Evgeny    6 年前

    试试这个

    val jobs = Seq("stream.Job1","stream.Job2","stream.Job3")
    jobs.foldLeft(Future.successful[Unit]()) {
      case (result, jobClass) =>
        result.flatMap[Unit] {_ =>
          Future {
            println(s"Starting the spark job from class $jobClass...")
            % gcloud("sparkC", "jobs", "submit", "spark", s"--cluster=$clusterName", s"--class=$jobClass", "--region=global", s"--jars=$JarFile")
            println(s"Starting the spark job from class $jobClass...DONE")
          }
        }.
          recoverWith {
          case NonFatal(e) => result
        }
    }
    

    这将迭代您的作业,并在完成上一步后立即运行下一步。我补充道 recoverWith 阻止处理全部 Futures 如果其中任何一个出现故障,则独立进行

        2
  •  1
  •   Andrey Tyukin    6 年前

    如果作业彼此不依赖,并且如果您希望有一个结果列表 最后,您可以使用:

    import scala.concurrent._
    def runIndependentSequentially[X]
      (futs: List[() => Future[X]])
      (implicit ec: ExecutionContext): Future[List[X]] = futs match {
      case Nil => Future { Nil }
      case h :: t => for {
        x <- h()
        xs <- runIndependentSequentially(t)
      } yield x :: xs
    }
    

    现在,您可以在未来的工作列表中使用它,如下所示:

    import scala.concurrent.ExecutionContext.Implicits.global
    import scala.concurrent.duration._
    import scala.language.postfixOps
    
    val jobs = List("stream.Job1","stream.Job2","stream.Job3")
    val futFactories = jobs.map { jobClass =>
      () => Future {
        println(s"Starting the spark job from class $jobClass...")
        Thread.sleep(5000)
        "result[" + jobClass + "," + (System.currentTimeMillis / 1000) % 3600 + "]"
      }
    }
    
    println(Await.result(runIndependentSequentially(futFactories), 30 seconds))
    

    这将产生以下输出:

    Starting the spark job from class stream.Job1...
    Starting the spark job from class stream.Job2...
    Starting the spark job from class stream.Job3...
    List(result[stream.Job1,3011], result[stream.Job2,3016], result[stream.Job3,3021])
    

    更新 :期货列表替换为 List[() => Future[X]] ,以便 甚至在参数传递给 runIndependentSequentially 方法非常感谢@Evgeny指出这一点!