当
answering a question
我试图实现一个设置,其中主线程加入
CommonPool
并行执行一些独立的任务
java.util.streams
操作)。
我创造了尽可能多的演员
公用事业
线程,加上主线程的通道。演员们使用集合频道:
val resultChannel = Channel<Double>(UNLIMITED)
val poolComputeChannels = (1..commonPool().parallelism).map {
actor<Task>(CommonPool) {
for (task in channel) {
task.execute().also { resultChannel.send(it) }
}
}
}
val mainComputeChannel = Channel<Task>()
val allComputeChannels = poolComputeChannels + mainComputeChannel
这允许我通过使用
select
为每个任务查找空闲参与者的表达式:
select {
allComputeChannels.forEach { chan ->
chan.onSend(task) {}
}
}
所以我发送所有任务并关闭频道:
launch(CommonPool) {
jobs.forEach { task ->
select {
allComputeChannels.forEach { chan ->
chan.onSend(task) {}
}
}
}
allComputeChannels.forEach { it.close() }
}
现在我必须为主线程编写代码。在这里,我决定为
mainComputeChannel
,执行提交给主线程的任务,以及
resultChannel
,将单个结果累加到最终总和中:
return runBlocking {
var completedCount = 0
var sum = 0.0
while (completedCount < NUM_TASKS) {
select<Unit> {
mainComputeChannel.onReceive { task ->
task.execute().also { resultChannel.send(it) }
}
resultChannel.onReceive { result ->
sum += result
completedCount++
}
}
}
resultChannel.close()
sum
}
这就导致了
主计算机通道
可能被关闭
公用事业
线,但是
结果频道
还需要服务如果通道关闭,
onReceive
将引发异常并
onReceiveOrNull
将立即选择
null
. 两种选择都不可接受我没有办法避免注册
主计算机通道
如果是关着的话如果我用
if (!mainComputeChannel.isClosedForReceive)
,注册调用不会是原子的。
这就引出了我的问题:在某些频道可能被另一条线关闭,而另一些频道还活着的情况下,选择哪个频道是一个好习惯?