代码之家  ›  专栏  ›  技术社区  ›  Niko

斯卡拉期货:非确定性产出

  •  1
  • Niko  · 技术社区  · 6 年前

    我是斯卡拉的新手,我正在通过创建一些重试方案来练习期货自由。这样做,我得到了以下代码:

    import scala.concurrent.{Await, Future}
    import scala.concurrent.ExecutionContext.Implicits.global
    import scala.concurrent.duration._
    
    object Retries extends App {
    
      var retries = 0
    
      def resetRetries(): Unit = retries = 0
    
      def calc() = if (retries > 3) 10 else {
        retries += 1
        println(s"I am thread ${Thread.currentThread().getId} This is going to fail. Retry count $retries")
        throw new IllegalArgumentException("This failed")
      }
    
      def fCalc(): Future[Int] = Future(calc())
    
      resetRetries()
    
      val ff = fCalc() // 0 - should fail
        .fallbackTo(fCalc()) // 1 - should fail
        .fallbackTo(fCalc()) // 2 - should fail
        .fallbackTo(fCalc()) // 3 - should fail
        .fallbackTo(fCalc()) // 4 - should be a success
    
      Await.ready(ff, 10.second)
    
      println(ff.isCompleted)
      println(ff.value)
    }
    

    每次运行此代码时,都会得到不同的结果。我得到的结果样本如下

    输出1

    I am thread 12 This is going to fail. Retry count 1
    I am thread 14 This is going to fail. Retry count 3
    I am thread 13 This is going to fail. Retry count 2
    I am thread 11 This is going to fail. Retry count 1
    I am thread 12 This is going to fail. Retry count 4
    true
    Some(Failure(java.lang.IllegalArgumentException: This failed))
    

    输出2

    I am thread 12 This is going to fail. Retry count 2
    I am thread 11 This is going to fail. Retry count 1
    I am thread 13 This is going to fail. Retry count 3
    I am thread 14 This is going to fail. Retry count 4
    true
    Some(Success(10))
    

    输出3

    I am thread 12 This is going to fail. Retry count 1
    I am thread 11 This is going to fail. Retry count 1
    I am thread 12 This is going to fail. Retry count 2
    I am thread 12 This is going to fail. Retry count 3
    I am thread 12 This is going to fail. Retry count 4
    true
    Some(Failure(java.lang.IllegalArgumentException: This failed))
    

    并非总是这样,结果会在成功和失败之间交替。在成功运行之前,可能会有多个失败的运行。

    据我所知,应该只有4个日志“I am thread x this is going fail.重试次数x“,应如下所示:

    I am thread a This is going to fail. Retry count 1
    I am thread b This is going to fail. Retry count 2
    I am thread c This is going to fail. Retry count 3
    I am thread d This is going to fail. Retry count 4
    

    不一定按这个顺序——因为我不知道scala线程模型是如何工作的——但是你明白我的意思了。然而,我得到了这个我无法处理的非确定性输出。 我的问题是:这种非确定性输出是从哪里来的?

    我要指出的是,以下重试机制会产生一致的结果:

    import scala.concurrent.{Await, Future}
    import scala.concurrent.ExecutionContext.Implicits.global
    import scala.concurrent.duration._
    
    object Retries extends App {
    
      var retries = 0
    
      def resetRetries(): Unit = retries = 0
    
      def calc() = if (retries > 3) 10 else {
        retries += 1
        println(s"I am thread ${Thread.currentThread().getId} This is going to fail. Retry count $retries")
        throw new IllegalArgumentException("This failed")
      }
    
      def retry[T](op: => T)(retries: Int): Future[T] = Future(op) recoverWith { case _ if retries > 0 => retry(op)(retries - 1) }
    
      resetRetries()
      val retriableFuture: Future[Future[Int]] = retry(calc())(5)
      Await.ready(retriableFuture, 10 second)
    
      println(retriableFuture.isCompleted)
      println(retriableFuture.value)
    }
    

    产量

    I am thread 11 This is going to fail. Retry count 1
    I am thread 12 This is going to fail. Retry count 2
    I am thread 11 This is going to fail. Retry count 3
    I am thread 12 This is going to fail. Retry count 4
    true
    Some(Success(10))
    

    如果我减少重试次数( retry(calc())(3) )结果是一个失败的未来

    I am thread 11 This is going to fail. Retry count 1
    I am thread 12 This is going to fail. Retry count 2
    I am thread 11 This is going to fail. Retry count 3
    I am thread 12 This is going to fail. Retry count 4
    true
    Some(Failure(java.lang.IllegalArgumentException: This failed))
    
    3 回复  |  直到 6 年前
        1
  •  4
  •   SergGr    6 年前

    虽然从技术上讲,@tim是正确的,但我认为他并没有真正回答这个问题。

    我相信你困惑的真正根源是你对结构的误解:

    f.fallbackTo(Future(calc()))
    

    做。它与

    f.recoverWith({ case _ => Future(calc())})
    

    有两个重要区别:

    1. fallbackTo 案例 Future(calc()) 立即创建,因此(几乎)立即开始执行 calc() . 因此,原始未来和回退未来同时运行。在这种情况下 recoverWith 只有在原始未来失败后才创建回退未来。此差异影响日志记录顺序。这也意味着 var retries 是并发的,因此当所有线程由于某些更新而实际失败时,您可能会看到这种情况。 retries 迷路了。

    2. 另一个棘手的问题是 倒退到 documented AS(突出显示是我的)

    创建一个新的未来,若成功完成,则保存该未来的结果;若未成功完成,则保存该未来的结果。 如果两个期货都失败 , the 结果的未来 持有的可丢弃对象 第一个未来。

    这种差异并不会真正影响您的示例,因为您在所有失败的尝试中抛出的异常都是相同的,但如果它们不同,可能会影响结果。例如,如果将代码修改为:

      def calc(attempt: Int) = if (retries > 3) 10 else {
        retries += 1
        println(s"I am thread ${Thread.currentThread().getId} This is going to fail. Retry count $retries")
        throw new IllegalArgumentException(s"This failed $attempt")
      }
    
      def fCalc(attempt: Int): Future[Int] = Future(calc(attempt))
    
      val ff = fCalc(1) // 0 - should fail
          .fallbackTo(fCalc(2)) // 1 - should fail
          .fallbackTo(fCalc(3)) // 2 - should fail
          .fallbackTo(fCalc(4)) // 3 - should fail
          .fallbackTo(fCalc(5)) // 4 - should be a success
    

    那么你应该得到这两个结果中的任何一个

    Some(Failure(java.lang.IllegalArgumentException: This failed 1))
    Some(Success(10))
    

    从来没有其他“失败”的价值。

    注意这里我明确地传递了 attempt 不符合比赛条件 重试 .


    回答更多评论 (1月28日)

    我明确通过的原因 尝试 在我前面的示例中,这是确保 IllegalArgumentException 由逻辑第一者创建 calc 将得到 1 作为它在所有(甚至不太现实)线程调度下的价值。

    如果您只想让所有日志具有不同的值,那么有一种更简单的方法:使用局部变量!

      def calc() = {
        val retries = atomicRetries.getAndIncrement()
        if (retries > 3) 10 
        else {
          println(s"I am thread ${Thread.currentThread().getId} This is going to fail. Retry count $retries")
          throw new IllegalArgumentException(s"This failed $retries")
        }
      }
    

    这样你就避免了古典主义 TOCTOU 问题。

        2
  •  2
  •   Tim    6 年前

    这不是一个scala问题,而是一个更普遍的多线程问题 retries . 您有多个线程在没有任何同步的情况下读取和写入该值,因此无法预测每个线程何时运行或将看到什么值。

    看起来具体的问题是您正在测试 重试 然后再更新。所有四个线程都有可能在更新值之前对该值进行测试。在这种情况下,他们都会看到 0 然后抛出一个错误。

    解决办法是 重试 变成一个 AtomicInteger 使用 getAndIncrement . 这将自动检索该值并递增,因此每个线程都将看到适当的值。


    更新以下注释 :另一个答案解释了为什么多个线程同时启动,所以我不会在这里重复它。当多个线程并行运行时,日志记录的顺序总是不确定的。

        3
  •  0
  •   Niko    6 年前

    这就是最终对我有用的:

    (以下代码用于 calc() 方法充分解决了有关测井重复和未来非确定性结果的问题)

    var time = 0
      var resetTries = time = 0
    
      def calc() = this.synchronized {
        if (time > 3) 10 else {
          time += 1
          println(s"I am thread ${Thread.currentThread().getId} This is going to fail. Retry count $time") // For debugging purposes
          throw new IllegalStateException(("not yet"))
        }
      }
    

    AtomicInteger 必要的-使事情在我看来更加复杂。一 synchronised 包装是需要的。

    我必须强调一个事实,这只是为了演示,在生产代码中使用这样的设计可能不是最好的主意(阻止调用 calc 方法)。一个人应该使用 recoverWith 而是实现。

    感谢@serggr、@tim和@michalpolitowksi的帮助