sync.WaitGroup
来记录正在运行的goroutine的数量。每个goroutine在不再从通道获取数据后退出。一旦
WaitGroup
像这样:
import (
"sync"
"time"
)
type Data interface{} // just an example
type Consumer interface {
Consume(Data) Data
CleanUp()
Count() int
Timeout() time.Duration
}
func StartConsumers(consumer Consumer, inCh <-chan Data, outCh chan<- Data) {
wg := sync.WaitGroup{}
for i := 0; i < consumer.Count(); i++ {
wg.Add(1)
go func() {
consumeLoop:
for {
select {
case v, ok := <-inCh: // 'ok' says if the channel is still open
if !ok {
break consumeLoop
}
outCh <- consumer.Consume(v)
case <-time.After(consumer.Timeout()):
break consumeLoop
}
}
wg.Done()
}()
}
wg.Wait()
consumer.CleanUp()
close(outCh)
}
在管道的每个阶段,您都可以使用与上面类似的过程来启动消费者。