我有一个类,其定义如下:
abstract class StreamWrapper() {
def doStuff: Future[Unit] = ???
def recursiveStream(): Unit =
events.mapAsync(parallelism)(doStuff)
.runWith(Sink.fold(???))
.onComplete(
actorSystem.scheduler.scheduleOnce(delay)(recursiveStream())
// ^-- recursion --^
)
}
我不想从
doStuff
函数(在我的例子中,我有一个断路器,所以我想通过它的回调打开/关闭递归流)。
解决这个问题最简单的方法是引入原子布尔,检查它
recursiveStream
多斯塔夫
:
abstract class StreamWrapper() {
private val enabled = new AtomicBoolean(true)
def doStuff: Future[Unit] = ??? // toggle enabled here
def recursiveStream(): Unit =
if (enabled)
events.mapAsync(parallelism)(doStuff)
.runWith(Sink.fold(???))
.onComplete(
actorSystem.scheduler.scheduleOnce(delay)(recursiveStream())
)
else log.info("Stream is currently offline...")
}