代码之家  ›  专栏  ›  技术社区  ›  Marko Topolnik

如何安全地选择一些可能同时关闭的通道?

  •  0
  • Marko Topolnik  · 技术社区  · 6 年前

    answering a question 我试图实现一个设置,其中主线程加入 CommonPool 并行执行一些独立的任务 java.util.streams 操作)。

    我创造了尽可能多的演员 公用事业 线程,加上主线程的通道。演员们使用集合频道:

    val resultChannel = Channel<Double>(UNLIMITED)
    val poolComputeChannels = (1..commonPool().parallelism).map {
        actor<Task>(CommonPool) {
            for (task in channel) {
                task.execute().also { resultChannel.send(it) }
            }
        }
    }
    val mainComputeChannel = Channel<Task>()
    val allComputeChannels = poolComputeChannels + mainComputeChannel
    

    这允许我通过使用 select 为每个任务查找空闲参与者的表达式:

    select {
        allComputeChannels.forEach { chan ->
            chan.onSend(task) {}
        }
    }
    

    所以我发送所有任务并关闭频道:

    launch(CommonPool) {
        jobs.forEach { task ->
            select {
                allComputeChannels.forEach { chan ->
                    chan.onSend(task) {}
                }
            }
        }
        allComputeChannels.forEach { it.close() }
    }
    

    现在我必须为主线程编写代码。在这里,我决定为 mainComputeChannel ,执行提交给主线程的任务,以及 resultChannel ,将单个结果累加到最终总和中:

    return runBlocking {
        var completedCount = 0
        var sum = 0.0
        while (completedCount < NUM_TASKS) {
            select<Unit> {
                mainComputeChannel.onReceive { task ->
                    task.execute().also { resultChannel.send(it) }
                }
                resultChannel.onReceive { result ->
                    sum += result
                    completedCount++
                }
            }
        }
        resultChannel.close()
        sum
    }
    

    这就导致了 主计算机通道 可能被关闭 公用事业 线,但是 结果频道 还需要服务如果通道关闭, onReceive 将引发异常并 onReceiveOrNull 将立即选择 null . 两种选择都不可接受我没有办法避免注册 主计算机通道 如果是关着的话如果我用 if (!mainComputeChannel.isClosedForReceive) ,注册调用不会是原子的。

    这就引出了我的问题:在某些频道可能被另一条线关闭,而另一些频道还活着的情况下,选择哪个频道是一个好习惯?

    1 回复  |  直到 5 年前
        1
  •  1
  •   Roman Elizarov    6 年前

    这个 kotlinx.coroutines 库当前缺少一个基元以方便使用。悬而未决的建议是 receiveOrClose 功能和 onReceiveOrClosed 条款 select 这将使编写这样的代码成为可能。

    但是,您仍然需要手动跟踪以下事实: mainComputeChannel 已经关闭,在关闭时停止选择。所以,使用 收到或关闭 子句您将编写如下内容:

    // outside of loop
    var mainComputeChannelClosed = false
    // inside loop
    select<Unit> {
        if (!mainComputeChannelClosed) {
            mainComputeChannel.onReceiveOrClosed { 
                if (it.isClosed) mainComputeChannelClosed = true 
                else { /* do something with it */ }
            }
        }
        // more clauses
    }    
    

    https://github.com/Kotlin/kotlinx.coroutines/issues/330 详情。

    目前还没有进一步简化这种模式的建议。