代码之家  ›  专栏  ›  技术社区  ›  Sergio

如何按顺序运行Kotlin协同程序?

  •  0
  • Sergio  · 技术社区  · 2 年前

    我有一个例子 CoroutineScope log() 函数,如下所示:

    private val scope = CoroutineScope(Dispatchers.IO)
    
    fun log(message: String) = scope.launch { // launching a coroutine
        println("$message")
        TimeUnit.MILLISECONDS.sleep(100) // some blocking operation
    }
    

    我使用这个测试代码来启动协同程序:

    repeat(5) { item ->
        log("Log $item")
    }
    

    这个 log() 函数可以在任何地方调用 Thread ,但不是从郊游中得到的。

    经过几次测试,我可以看到以下不是顺序的结果:

    Log 0
    Log 2
    Log 4
    Log 1
    Log 3
    

    打印日志的顺序可能不同。如果我理解正确,协同程序的执行不能保证是顺序的。这意味着 item 2 可以在的协同程序之前启动 item 0

    我希望协同程序按顺序为每个项目启动,“一些阻塞操作”将按顺序执行,以始终实现下一个日志:

    Log 0
    Log 1
    Log 2
    Log 3
    Log 4
    

    有没有一种方法可以使启动协同程序按顺序进行?或者也许还有其他方法可以实现我想要的?

    提前感谢您的帮助!

    0 回复  |  直到 2 年前
        1
  •  4
  •   Tenfour04    2 年前

    一种可能的策略是使用Channel按顺序加入已启动的作业。你需要懒散地启动作业,这样它们才会启动 join 被召唤到他们身上。 trySend 当信道具有无限容量时,总是成功的。你需要使用 trySend 所以它可以从外部调用。

    private val lazyJobChannel = Channel<Job>(capacity = Channel.UNLIMITED).apply {
        scope.launch {
            consumeEach { it.join() }
        }
    }
    
    fun log(message: String) {
        lazyJobChannel.trySend(
            scope.launch(start = CoroutineStart.LAZY) {
                println("$message")
                TimeUnit.MILLISECONDS.sleep(100) // some blocking operation
            }
        )
    }
    
        2
  •  0
  •   Sergio    2 年前

    自从 Flow s are sequential 我们可以使用 MutableSharedFlow 按顺序收集和处理数据:

    class Info {
        // make sure replay(in case some jobs were emitted before sharedFlow is being collected and could be lost)
        // and extraBufferCapacity are large enough to handle all the jobs. 
        // In case some jobs are lost try to increase either of the values.
        private val sharedFlow = MutableSharedFlow<String>(replay = 10, extraBufferCapacity = 10)
        private val scope = CoroutineScope(Dispatchers.IO)
    
        init {
            sharedFlow.onEach { message ->
                println("$message")
                TimeUnit.MILLISECONDS.sleep(100) // some blocking or suspend operation
            }.launchIn(scope)
        }
    
        fun log(message: String) {
            sharedFlow.tryEmit(message) 
        }
    }
    
    fun test() {
    
        val info = Info()
    
        repeat(10) { item ->
            info.log("Log $item")
        }
    }
    

    它总是按正确的顺序打印日志:

    Log 0
    Log 1
    Log 2
    ...
    Log 9
    

    它适用于所有情况,但需要确保有足够的元素设置为 replay extraBufferCapacity 的参数 可变共享流 以处理所有项目。


    另一种方法是

    使用 Dispatchers.IO.limitedParallelism(1) 作为 CoroutineScope .如果协同程序不包含对 suspend 功能,并从相同位置启动 线 ,例如。 主线程 。因此,此解决方案仅适用于阻塞(而不是 )内部操作 launch 协同程序生成器:

    private val scope = CoroutineScope(Dispatchers.IO.limitedParallelism(1))
    
    fun log(message: String) = scope.launch { // launching a coroutine from the same Thread, e.g. Main Thread
        println("$message")
        TimeUnit.MILLISECONDS.sleep(100) // only blocking operation, not `suspend` operation
    }
    

    事实证明,单线程调度器是一个FIFO执行器。因此,限制 CoroutineScope 执行到一个线程就解决了这个问题。

    推荐文章