代码之家  ›  专栏  ›  技术社区  ›  Juh_

如何使monix固定速率调度器在失败时继续

  •  0
  • Juh_  · 技术社区  · 6 年前

    我刚刚开始使用monix,部分是为了在长时间运行的应用程序中安排重复的工作。我将管理异常,但我希望monix继续调用给定的函数,即使我让他们通过一些。

    现在,通过一个简单的测试,一旦安排了重复调用,一旦出现异常,它就不会继续调用它:

    // will print "Hi" repeatedly
    scheduler.scheduleAtFixedRate(5.milliseconds, 2.milliseconds) {
      println("Hi")
    }
    
    // will print "Hi" only once
    scheduler.scheduleAtFixedRate(5.milliseconds, 2.milliseconds) {
      println("Hi")
      throw new RuntimeException("oups, forgot to catch that one")
    }
    

    我意识到,在失败时简单地重复这个任务是一个糟糕的设计。相反,我应该设置一个适当的异常管理系统,延迟重启。

    1 回复  |  直到 6 年前
        1
  •  1
  •   lprakashv    6 年前

    你总是可以利用scala.util.Try为此或简单的尝试捕捉块。在任何失败的情况下,您只需登录并继续。您甚至可以有如下失败重试策略。

    import scala.util._
    
    def taskExceptionProne() = ???
    
    var failures = 0
    val maxRetries = 10
    
    scheduler.scheduleAtFixedRate(5.milliseconds, 2.milliseconds) {
        Try(taskExceptionProne) match {
            Success(result) =>
                //do something with the result
                failures = 0
                println("Task executed.")
            Failure(throwable) =>
                if (failures>=maxRetries) throw throwable else {
                    failures = failures + 1
                    //log failure
                    println(throwable.getMessage)
                }
        }
    }
    
        2
  •  2
  •   atl    5 年前

    Observable ,这使得写作更容易。它还带有许多内置功能,因此您不需要手动实现自己的功能。

    val myTask: Task[Unit] = Task.delay(println("Execute Task")
    
    // Note that the period will tick after you've completed the task
    // So, if you have a long running task, just make sure you are aware of that
    Observable
      .intervalAtFixedRate(1.seconds, 1.seconds)
      .mapEval(_ => myTask.onErrorRestart(maxRetries = 5))
      .completedL
      .startAndForget // Optional, will run in a separate fiber, so it doesn't block main thread
      .runToFuture
    
    推荐文章