自从
Flow
s are sequential
我们可以使用
MutableSharedFlow
按顺序收集和处理数据:
class Info {
// make sure replay(in case some jobs were emitted before sharedFlow is being collected and could be lost)
// and extraBufferCapacity are large enough to handle all the jobs.
// In case some jobs are lost try to increase either of the values.
private val sharedFlow = MutableSharedFlow<String>(replay = 10, extraBufferCapacity = 10)
private val scope = CoroutineScope(Dispatchers.IO)
init {
sharedFlow.onEach { message ->
println("$message")
TimeUnit.MILLISECONDS.sleep(100) // some blocking or suspend operation
}.launchIn(scope)
}
fun log(message: String) {
sharedFlow.tryEmit(message)
}
}
fun test() {
val info = Info()
repeat(10) { item ->
info.log("Log $item")
}
}
它总是按正确的顺序打印日志:
Log 0
Log 1
Log 2
...
Log 9
它适用于所有情况,但需要确保有足够的元素设置为
replay
和
extraBufferCapacity
的参数
可变共享流
以处理所有项目。
另一种方法是
使用
Dispatchers.IO.limitedParallelism(1)
作为
CoroutineScope
.如果协同程序不包含对
suspend
功能,并从相同位置启动
线
,例如。
主线程
。因此,此解决方案仅适用于阻塞(而不是
悬
)内部操作
launch
协同程序生成器:
private val scope = CoroutineScope(Dispatchers.IO.limitedParallelism(1))
fun log(message: String) = scope.launch { // launching a coroutine from the same Thread, e.g. Main Thread
println("$message")
TimeUnit.MILLISECONDS.sleep(100) // only blocking operation, not `suspend` operation
}
事实证明,单线程调度器是一个FIFO执行器。因此,限制
CoroutineScope
执行到一个线程就解决了这个问题。