代码之家  ›  专栏  ›  技术社区  ›  Tushar

围棋中的慢生产者、快消费者如何处理频道关闭同步?[关闭]

  •  -1
  • Tushar  · 技术社区  · 6 年前

    我是新手,找不到这个问题的答案。我正在做的是,在producer中读取一个CSV文件,做一些可能需要时间的事情,然后通过一个通道将输出发送给消费者。有一连串的 生产者消费者

    消费者-生产者-2(>1个goroutines)->频道2->消费者(>1个goroutines)

    这里最多可以有15个消费者。

    我需要实现的是:

    1. 一旦生产者完成,所有的消费者最终都应该做一些清理工作,并在完成剩余的部分后退出
    2. 如果使用者在特定的超时时间段内没有获得任何数据,它可以退出(最好有一个信号),而不需要进一步阻塞。
    3. 整个序列中的所有生产者-消费者对都会发生这种情况。

    我采用了以下方法。

    1. 在每个数据通道中保留一个信号通道,并为下一个用户的每个goroutine发布一个“done”。
    2. https://golang.org/pkg/sync/#Once.Do ).
    3. 下面是我在这里能想到的。

      processRemaining = false
      for processRemaining == false{
              select {
              case stuff, ok := <-input_messages:
                      do_stuff(stuff)
                      if ok == false { // if channel has been closed
                          processRemaining = true
                      }
                      if result != nil {
                              //send to channel output_messages
                      }
              case sig := <-input_signals: // if signaled to stopped.
                      fmt.Println("received signal", sig)
                      processRemaining = true
              default:
                      fmt.Println("no activity")
              }
      }
      if processRemaining {
              for stuff := range input_messages {
                      do_stuff(stuff)
                      if result != nil {
                              //send to channel output_messages
                      }
              }
              // send "output_routine" number of "done" to a channel "output_signals".
      }
      

    但即使在这种方法中,我也无法想出任何方法来表现出与关闭的“输入消息”通道相同的方式,如果没有可用的,比如说10秒钟。

    1. 一旦第一个“通道0”关闭,所有后续通道都将关闭。
    2. 在关闭输出通道之前,所有的生产者都会被更新,并且只有在所有生产者都完成写入之后,通道才会被关闭。
    3. 如果使用者在指定的超时时间内没有从通道获取数据,则应将其视为已关闭,并自行解除阻止。
    1 回复  |  直到 6 年前
        1
  •  0
  •   svsd    6 年前

    sync.WaitGroup 来记录正在运行的goroutine的数量。每个goroutine在不再从通道获取数据后退出。一旦 WaitGroup

    像这样:

    import (
            "sync"
            "time"
    )
    
    type Data interface{} // just an example
    
    type Consumer interface {
            Consume(Data) Data
            CleanUp()
            Count() int
            Timeout() time.Duration
    }
    
    func StartConsumers(consumer Consumer, inCh <-chan Data, outCh chan<- Data) {
            wg := sync.WaitGroup{}
            for i := 0; i < consumer.Count(); i++ {
                    wg.Add(1)
                    go func() {
                    consumeLoop:
                            for {
                                    select {
                                    case v, ok := <-inCh: // 'ok' says if the channel is still open
                                            if !ok {
                                                    break consumeLoop
                                            }
                                            outCh <- consumer.Consume(v)
                                    case <-time.After(consumer.Timeout()):
                                            break consumeLoop
                                    }
                            }
    
                            wg.Done()
                    }()
            }
            wg.Wait()
    
            consumer.CleanUp()
            close(outCh)
    }
    

    在管道的每个阶段,您都可以使用与上面类似的过程来启动消费者。