代码之家  ›  专栏  ›  技术社区  ›  Joshua Esolk

为什么在Akka Dispatcher上启动Futures时,Futures中的Futures会按顺序运行

  •  7
  • Joshua Esolk  · 技术社区  · 6 年前

    当我们试图从一个参与者的receive方法中启动一些未来时,我们观察到一种奇怪的行为。 如果我们将配置的dispatchers用作ExecutionContext,那么未来将在同一线程上按顺序运行。如果我们使用ExecutionContext。隐含的。在全球范围内,期货市场如预期一样平行运行。

    我们将代码归结为以下示例(下面是一个更完整的示例):

    implicit val ec = context.getDispatcher
    
    Future{ doWork() } // <-- all running parallel
    Future{ doWork() }
    Future{ doWork() }
    Future{ doWork() }
    
    Future {
       Future{ doWork() } 
       Future{ doWork() } // <-- NOT RUNNING PARALLEL!!! WHY!!!
       Future{ doWork() }
       Future{ doWork() }
    }
    

    可编译的示例如下:

    import akka.actor.ActorSystem
    import scala.concurrent.{ExecutionContext, Future}
    
    object WhyNotParallelExperiment extends App {
    
      val actorSystem = ActorSystem(s"Experimental")   
    
      // Futures not started in future: running in parallel
      startFutures(runInFuture = false)(actorSystem.dispatcher)
      Thread.sleep(5000)
    
      // Futures started in future: running in sequentially. Why????
      startFutures(runInFuture = true)(actorSystem.dispatcher)
      Thread.sleep(5000)
    
      actorSystem.terminate()
    
      private def startFutures(runInFuture: Boolean)(implicit executionContext: ExecutionContext): Unit = {
        if (runInFuture) {
          Future{
            println(s"Start Futures on thread ${Thread.currentThread().getName()}")
            (1 to 9).foreach(startFuture)
            println(s"Started Futures on thread ${Thread.currentThread().getName()}")
          }
        } else {
          (11 to 19).foreach(startFuture)
        }
      }
    
      private def startFuture(id: Int)(implicit executionContext: ExecutionContext): Future[Unit] = Future{
        println(s"Future $id should run for 500 millis on thread ${Thread.currentThread().getName()}")
        Thread.sleep(500)
        println(s"Future $id finished on thread ${Thread.currentThread().getName()}")
      }
    
    
    }
    

    我们尝试了线程池执行器和fork-join执行器,得到了相同的结果。

    我们是否以错误的方式使用期货? 然后应该如何生成并行任务?

    3 回复  |  直到 5 年前
        1
  •  3
  •   Jeffrey Chung    6 年前

    从Akka的内部 BatchingExecutor (重点矿山):

    遗嘱执行人的混合特征 组多个嵌套 Runnable.run() 调用传递给原始执行器的单个Runnable 。这是一个有用的优化,因为它绕过了原始上下文的任务队列,并将相关(嵌套)代码保留在单个线程上,这可能会提高CPU亲和力。然而,如果传递给执行者的任务阻塞或代价高昂,这种优化可以防止工作窃取,并使性能更差。。。。如果代码不使用 scala.concurrent.blocking 因为在其他任务中创建的任务将阻止外部任务完成。

    如果您使用的是混合在一起的调度程序 批处理执行器 --即 MessageDispatcher --您可以使用 斯卡拉。同时发生的阻塞 构造以启用嵌套未来的并行性:

    Future {
      Future {
        blocking {
          doBlockingWork()
        }
      }
    }
    

    在您的示例中,您可以添加 blocking startFuture 方法:

    private def startFuture(id: Int)(implicit executionContext: ExecutionContext): Future[Unit] = Future {
      blocking {
        println(s"Future $id should run for 500 millis on thread ${Thread.currentThread().getName()}")
        Thread.sleep(500)
        println(s"Future $id finished on thread ${Thread.currentThread().getName()}")
      }
    }
    

    运行的样本输出 startFutures(true)(actorSystem.dispatcher) 根据上述变更:

    Start Futures on thread Experimental-akka.actor.default-dispatcher-2
    Started Futures on thread Experimental-akka.actor.default-dispatcher-2
    Future 1 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-2
    Future 3 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-3
    Future 5 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-6
    Future 7 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-7
    Future 4 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-5
    Future 9 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-10
    Future 6 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-8
    Future 8 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-9
    Future 2 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-4
    Future 1 finished on thread Experimental-akka.actor.default-dispatcher-2
    Future 3 finished on thread Experimental-akka.actor.default-dispatcher-3
    Future 5 finished on thread Experimental-akka.actor.default-dispatcher-6
    Future 4 finished on thread Experimental-akka.actor.default-dispatcher-5
    Future 8 finished on thread Experimental-akka.actor.default-dispatcher-9
    Future 7 finished on thread Experimental-akka.actor.default-dispatcher-7
    Future 9 finished on thread Experimental-akka.actor.default-dispatcher-10
    Future 6 finished on thread Experimental-akka.actor.default-dispatcher-8
    Future 2 finished on thread Experimental-akka.actor.default-dispatcher-4
    
        2
  •  0
  •   Denis Makarenko    6 年前

    它与dispatcher的“吞吐量”设置有关。我在应用程序中添加了一个“公平调度程序”。conf演示:

    fair-dispatcher {
      # Dispatcher is the name of the event-based dispatcher
      type = Dispatcher
      # What kind of ExecutionService to use
      executor = "fork-join-executor"
      # Configuration for the fork join pool
      fork-join-executor {
        # Min number of threads to cap factor-based parallelism number to
        parallelism-min = 2
        # Parallelism (threads) ... ceil(available processors * factor)
        parallelism-factor = 2.0
        # Max number of threads to cap factor-based parallelism number to
        parallelism-max = 10
      }
      # Throughput defines the maximum number of messages to be
      # processed per actor before the thread jumps to the next actor.
      # Set to 1 for as fair as possible.
      throughput = 1
    }
    

    下面是您的示例,对使用fair dispatcher for Futures和打印吞吐量设置的当前值进行了一些修改:

    package com.test
    
    import akka.actor.ActorSystem
    
    import scala.concurrent.{ExecutionContext, Future}
    
    object WhyNotParallelExperiment extends App {
    
      val actorSystem = ActorSystem(s"Experimental")
    
      println("Default dispatcher throughput:")
      println(actorSystem.dispatchers.defaultDispatcherConfig.getInt("throughput"))
    
      println("Fair dispatcher throughput:")
      println(actorSystem.dispatchers.lookup("fair-dispatcher").configurator.config.getInt("throughput"))
    
      // Futures not started in future: running in parallel
      startFutures(runInFuture = false)(actorSystem.dispatcher)
      Thread.sleep(5000)
    
      // Futures started in future: running in sequentially. Why????
      startFutures(runInFuture = true)(actorSystem.dispatcher)
      Thread.sleep(5000)
    
      actorSystem.terminate()
    
      private def startFutures(runInFuture: Boolean)(implicit executionContext: ExecutionContext): Unit = {
        if (runInFuture) {
          Future{
            implicit val fairExecutionContext = actorSystem.dispatchers.lookup("fair-dispatcher")
            println(s"Start Futures on thread ${Thread.currentThread().getName()}")
            (1 to 9).foreach(i => startFuture(i)(fairExecutionContext))
            println(s"Started Futures on thread ${Thread.currentThread().getName()}")
          }
        } else {
          (11 to 19).foreach(startFuture)
        }
      }
    
      private def startFuture(id: Int)(implicit executionContext: ExecutionContext): Future[Unit] = Future{
        println(s"Future $id should run for 500 millis on thread ${Thread.currentThread().getName()}")
        Thread.sleep(500)
        println(s"Future $id finished on thread ${Thread.currentThread().getName()}")
      }
    }
    

    输出:

    Default dispatcher throughput:
    5
    Fair dispatcher throughput:
    1
    Future 12 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-3
    Future 11 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-4
    Future 13 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-2
    Future 14 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-5
    Future 16 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-7
    Future 15 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-6
    Future 17 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-8
    Future 18 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-9
    Future 19 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-10
    Future 13 finished on thread Experimental-akka.actor.default-dispatcher-2
    Future 11 finished on thread Experimental-akka.actor.default-dispatcher-4
    Future 12 finished on thread Experimental-akka.actor.default-dispatcher-3
    Future 14 finished on thread Experimental-akka.actor.default-dispatcher-5
    Future 16 finished on thread Experimental-akka.actor.default-dispatcher-7
    Future 15 finished on thread Experimental-akka.actor.default-dispatcher-6
    Future 17 finished on thread Experimental-akka.actor.default-dispatcher-8
    Future 18 finished on thread Experimental-akka.actor.default-dispatcher-9
    Future 19 finished on thread Experimental-akka.actor.default-dispatcher-10
    Start Futures on thread Experimental-akka.actor.default-dispatcher-10
    Future 1 should run for 500 millis on thread Experimental-fair-dispatcher-12
    Future 2 should run for 500 millis on thread Experimental-fair-dispatcher-13
    Future 4 should run for 500 millis on thread Experimental-fair-dispatcher-15
    Future 3 should run for 500 millis on thread Experimental-fair-dispatcher-14
    Future 5 should run for 500 millis on thread Experimental-fair-dispatcher-17
    Future 6 should run for 500 millis on thread Experimental-fair-dispatcher-16
    Future 7 should run for 500 millis on thread Experimental-fair-dispatcher-18
    Future 8 should run for 500 millis on thread Experimental-fair-dispatcher-19
    Started Futures on thread Experimental-akka.actor.default-dispatcher-10
    Future 4 finished on thread Experimental-fair-dispatcher-15
    Future 2 finished on thread Experimental-fair-dispatcher-13
    Future 1 finished on thread Experimental-fair-dispatcher-12
    Future 9 should run for 500 millis on thread Experimental-fair-dispatcher-15
    Future 5 finished on thread Experimental-fair-dispatcher-17
    Future 7 finished on thread Experimental-fair-dispatcher-18
    Future 8 finished on thread Experimental-fair-dispatcher-19
    Future 6 finished on thread Experimental-fair-dispatcher-16
    Future 3 finished on thread Experimental-fair-dispatcher-14
    Future 9 finished on thread Experimental-fair-dispatcher-15
    

    正如您所看到的,fair dispatcher在未来的大部分时间使用不同的线程。

    默认调度程序针对参与者进行了优化,因此吞吐量设置为5,以最小化上下文切换,从而提高消息处理吞吐量,同时保持一定程度的公平性。

    fair dispatcher中唯一的变化是吞吐量:1,即如果可能,每个异步执行请求都有自己的线程(最大并行度)。

    我建议为用于不同目的的期货创建单独的调度器。E、 例如,一个调度程序(即线程池)用于调用某些web服务,另一个用于阻止DB访问等。这将通过调整自定义调度程序设置,让您对其进行更精确的控制。

    看看 https://doc.akka.io/docs/akka/current/dispatchers.html ,这对于理解细节非常有用。

    还可以查看Akka参考设置(尤其是默认dispatcher),这里有一些有用的注释: https://github.com/akka/akka/blob/master/akka-actor/src/main/resources/reference.conf

        3
  •  0
  •   Joshua Esolk    6 年前

    经过一些研究,我发现 Dispatcher 类实现 akka.dispatch.BatchingExecutor 。出于性能原因,此类检查应在同一线程上批处理哪些任务。 Future.map 在内部创建 scala.concurrent.OnCompleteRunnable BatchingExecutor

    这似乎是合理的 map() / flatMap() 其中一个任务生成一个后续任务,但不用于用于分叉工作的显式新未来。 内部, Future.apply 实施人 Future.successful().map 并由此进行配料。我现在的解决方法是以不同的方式创建未来:

    object MyFuture {
      def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = {
        val promise = Promise[T]()
        class FuturesStarter extends Runnable {
          override def run(): Unit = {
            promise.complete(Try(body))
          }
        }
        executor.execute(new FuturesStarter)
        promise.future
      }
    }
    

    这个 FutureStarter -可运行项不是批处理的,因此是并行运行的。

    有人能确认这个解决方案是可行的吗? 有没有更好的方法来解决这个问题? 当前是否实施 Future / 批处理执行器 通缉犯,还是一只虫子?