代码之家  ›  专栏  ›  技术社区  ›  Avi Mosseri

如何从正在从该通道接收数据的goroutine向通道添加对象?

  •  2
  • Avi Mosseri  · 技术社区  · 7 年前

    基本上,我正在尝试使用goroutines编写一个并发站点地图爬虫。一个站点地图可以包含指向多个站点地图的链接,这些站点地图可以包含指向其他站点地图等的链接。

    现在,这是我的设计:

    worker:
         - receives url from channel
         - processesUrl(url)
    processUrl:
         for each link in lookup(url):
             - if link is sitemap:
                    channel <- url
               else:
                   print(url)
    main:
        - create 10 workers
        - chanel <- root url
    

    问题是,在processUrl()完成之前,工作者不会接受来自频道的新url;如果工作者正在向频道添加url,则在工作者接受来自频道的新url之前,processUrl不会完成。我可以使用什么并发设计将url添加到没有通道、没有繁忙等待或没有等待的任务队列中 channel <- url ?

    以下是实际代码(如果有帮助):

    func (c *SitemapCrawler) worker() {
        for {
            select {
            case url := <-urlChan:
                fmt.Println(url)
                c.crawlSitemap(url)
            }
        }
    }
    func crawlUrl(url string) {
        defer crawlWg.Done()
        crawler := NewCrawler(url)
        for i := 0; i < MaxCrawlRate*20; i++ {
            go crawler.worker()
        }
        crawler.getSitemaps()
        pretty.Println(crawler.sitemaps)
        crawler.crawlSitemaps()
    }
    func (c SitemapCrawler) crawlSitemap(url string) {
        c.limiter.Take()
        resp, err := MakeRequest(url)
        if err != nil || resp.StatusCode != 200 {
            crawlWg.Done()
            return
        }
        var resp_txt []byte
        if strings.Contains(resp.Header.Get("Content-Type"), "html") {
            crawlWg.Done()
            return
        } else if strings.Contains(url, ".gz") || resp.Header.Get("Content-Encoding") == "gzip" {
            reader, err := gzip.NewReader(resp.Body)
            if err != nil {
                crawlWg.Done()
                panic(err)
            } else {
                resp_txt, err = ioutil.ReadAll(reader)
                if err != nil {
                    crawlWg.Done()
                    panic(err)
                }
            }
            reader.Close()
        } else {
            resp_txt, err = ioutil.ReadAll(resp.Body)
            if err != nil {
                //panic(err)
                crawlWg.Done()
                return
            }
        }
        io.Copy(ioutil.Discard, resp.Body)
        resp.Body.Close()
    
        d, err := libxml2.ParseString(string(resp_txt))
        if err != nil {
            crawlWg.Done()
            return
        }
        results, err := d.Find("//*[contains(local-name(), 'loc')]")
        if err != nil {
            crawlWg.Done()
            return
        }
        locs := results.NodeList()
        printLock.Lock()
        for i := 0; i < len(locs); i++ {
            newUrl := locs[i].TextContent()
            if strings.Contains(newUrl, ".xml") {
                crawlWg.Add(1)
                //go c.crawlSitemap(newUrl)
                urlChan <- newUrl
            } else {
                fmt.Println(newUrl)
            }
        }
        printLock.Unlock()
    
        crawlWg.Done()
    }
    
    1 回复  |  直到 7 年前
        1
  •  1
  •   Daan Willems    7 年前

    当通道未缓冲时,对通道的写入操作将阻塞。

    要创建缓冲通道,请执行以下操作:

    urlChan := make(chan string, len(allUrls))
    

    但是,当此通道已满时,写入操作将再次阻塞。

    或者,您可以使用开关。当写入“失败”时,它将立即进入默认状态

    select {
    case urlChan <- url:
        fmt.Println("received message")
    default:
        fmt.Println("no activity")
    }
    

    要在写入通道时超时,请执行以下操作

    select {
    case urlChan <- url:
        fmt.Println("received message")
    case <-time.After(5 * time.Second):
        fmt.Println("timed out")
    }
    

    或者最后将写入事件放在单独的go通道中

    func write() {
        urlChan <- url
    }
    
    go write()