代码之家  ›  专栏  ›  技术社区  ›  Cadel Watson

在一定时间内从goroutine接收值

  •  0
  • Cadel Watson  · 技术社区  · 7 年前

    我有一个goroutine,它可以生成无限多的值(每个值都比最后一个值更合适),但找到每个值的时间会越来越长。我正试图找到一种方法来添加一个时间限制,比如10秒,之后我的函数将以迄今为止收到的最佳值执行某些操作。

    这是我当前的“解决方案”,使用通道和计时器:

    // the goroutine which runs infinitely
    // (or at least a very long time for high values of depth)
    func runSearch(depth int, ch chan int) {
        for i := 1; i <= depth; i++ {
            fmt.Printf("Searching to depth %v\n", i)
            ch <- search(i)
        }
    }
    
    // consumes progressively better values until the channel is closed
    func awaitBestResult(ch chan int) {
        var result int
        for result := range ch {
            best = result
        }
    
        // do something with best result here
    }
    
    // run both consumer and producer
    func main() {
        timer := time.NewTimer(time.Millisecond * 2000)
    
        ch := make(chan int)
    
        go runSearch(1000, ch)
        go awaitBestResult(ch)
    
        <-timer.C
        close(ch)
    }
    

    这主要是可行的-最好的结果是在计时器结束和通道关闭后处理的。然而,我随后感到恐慌( panic: send on closed channel )从 runSearch goroutine,因为主功能已关闭通道。

    计时器完成后,如何停止运行的第一个goroutine?非常感谢您的帮助。

    2 回复  |  直到 7 年前
        1
  •  0
  •   Jonathan Hall    7 年前

    您需要确保goroutine知道它何时完成处理,以便它不会试图写入到封闭通道,从而导致恐慌。

    这听起来像是一个完美的案例 context 包裹:

    func runSearch(ctx context.Context, depth int, ch chan int) {
        for i := 1; i <= depth; i++ {
            select {
            case <- ctx.Done()
                // Context cancelled, return
                return
            default:
            }
            fmt.Printf("Searching to depth %v\n", i)
            ch <- search(i)
        }
    }
    

    然后在 main() :

    // run both consumer and producer
    func main() {
        ctx := context.WithTimeout(context.Background, 2 * time.Second)
    
        ch := make(chan int)
    
        go runSearch(ctx, 1000, ch)
        go awaitBestResult(ch)
    
        close(ch)
    }
    
        2
  •  0
  •   Ravi R    7 年前

    你因为发送goroutine而惊慌失措 runSearch 显然超过了计时器,它试图在您的 main 戈罗廷。您需要设计一种方法,一旦计时器失效,在关闭主通道之前,向发送go例程发出信号,不要发送任何值。另一方面,如果您的搜索很快结束,您还需要与 主要的 继续前进。您可以使用一个通道并进行同步,以便不存在竞争条件。最后,您需要知道消费者何时处理完所有数据,然后才能退出main。

    这里有一些可能有用的东西。

    package main
    
    import (
        "fmt"
        "sync"
        "time"
    )
    
    var mu sync.Mutex //To protect the stopped variable which will decide if a value is to be sent on the signalling channel
    var stopped bool
    
    func search(i int) int {
        time.Sleep(1 * time.Millisecond)
        return (i + 1)
    }
    
    // (or at least a very long time for high values of depth)
    func runSearch(depth int, ch chan int, stopSearch chan bool) {
    
        for i := 1; i <= depth; i++ {
            fmt.Printf("Searching to depth %v\n", i)
            n := search(i)
            select {
            case <-stopSearch:
                fmt.Println("Timer over! Searched till ", i)
                return
            default:
            }
    
            ch <- n
            fmt.Printf("Sent depth %v result for processing\n", i)
        }
    
        mu.Lock() //To avoid race condition with timer also being
        //completed at the same time as execution of this code
        if stopped == false {
            stopped = true
            stopSearch <- true
            fmt.Println("Search completed")
        }
        mu.Unlock()
    
    }
    
    // consumes progressively better values until the channel is closed
    func awaitBestResult(ch chan int, doneProcessing chan bool) {
        var best int
    
        for result := range ch {
            best = result
        }
        fmt.Println("Best result ", best)
        // do something with best result here
    
        //and communicate to main when you are done processing the result
        doneProcessing <- true
    
    }
    
    func main() {
        doneProcessing := make(chan bool)
        stopSearch := make(chan bool)
    
        // timer := time.NewTimer(time.Millisecond * 2000)
        timer := time.NewTimer(time.Millisecond * 12)
    
        ch := make(chan int)
    
        go runSearch(1000, ch, stopSearch)
        go awaitBestResult(ch, doneProcessing)
        select {
        case <-timer.C:
            //If at the same time runsearch is also completed and trying to send a value !
            //So we hold a lock before sending value on the channel
            mu.Lock()
            if stopped == false {
                stopped = true
                stopSearch <- true
                fmt.Println("Timer expired")
            }
            mu.Unlock()
    
        case <-stopSearch:
            fmt.Println("runsearch goroutine completed")
        }
        close(ch)
    
        //Wait for your consumer to complete processing
        <-doneProcessing
        //Safe to exit now
    }
    

    在…上 playground . 更改的值 timer 观察两种情况。