代码之家  ›  专栏  ›  技术社区  ›  Yevhenii Popadiuk

如何从类内部使递归akka流“toggleable”

  •  0
  • Yevhenii Popadiuk  · 技术社区  · 6 年前

    我有一个类,其定义如下:

    abstract class StreamWrapper() {
    
      def doStuff: Future[Unit] = ???
    
      def recursiveStream(): Unit =
        events.mapAsync(parallelism)(doStuff)
          .runWith(Sink.fold(???))
          .onComplete( 
            actorSystem.scheduler.scheduleOnce(delay)(recursiveStream())
                                                   // ^-- recursion --^
          )
    
    }
    

    我不想从 doStuff 函数(在我的例子中,我有一个断路器,所以我想通过它的回调打开/关闭递归流)。

    解决这个问题最简单的方法是引入原子布尔,检查它 recursiveStream 多斯塔夫 :

    abstract class StreamWrapper() {
    
      private val enabled = new AtomicBoolean(true)
    
      def doStuff: Future[Unit] = ??? // toggle enabled here
    
      def recursiveStream(): Unit = 
        if (enabled)
          events.mapAsync(parallelism)(doStuff)
            .runWith(Sink.fold(???))
            .onComplete( 
              actorSystem.scheduler.scheduleOnce(delay)(recursiveStream())
            )
        else log.info("Stream is currently offline...")
    
    }
    

    1 回复  |  直到 6 年前
        1
  •  1
  •   Jeffrey Chung    6 年前