代码之家  ›  专栏  ›  技术社区  ›  raspi

如何递归地列出go中有通道的文件?

  •  2
  • raspi  · 技术社区  · 6 年前

    我试图使用通道递归地列出目录树。

    目前,我得到了一些文件列表,然后它被困在一个目录。目录被发送到工作进程,但它不处理它。

    如何发送目录 里面 工人( if file.IsDir() )以便正确处理它,并通知文件列表器递归完成后没有要处理的新文件?

    以下是我目前的尝试:

    package main
    
    import (
        "fmt"
        "os"
        "path/filepath"
        "errors"
        "log"
    )
    
    // Job for worker
    type workerJob struct {
        Root string
    }
    
    // Result of a worker
    type workerResult struct {
        Filename string
    }
    
    func worker(jobs chan workerJob, results chan<- workerResult, done chan bool) {
        for j := range jobs {
            log.Printf(`Directory: %#v`, j.Root)
    
            dir, err := os.Open(j.Root)
    
            if err != nil {
                if os.IsPermission(err) {
                    // Skip if there's no permission
                    continue
                }
                continue
            }
    
            fInfo, err := dir.Readdir(-1)
            dir.Close()
            if err != nil {
                if os.IsPermission(err) {
                    // Skip if there's no permission
                    continue
                }
                continue
            }
    
            for _, file := range fInfo {
                fpath := filepath.Join(dir.Name(), file.Name())
    
                if file.Mode().IsRegular() {
                    // is file
                    fs := uint64(file.Size())
                    if fs == 0 {
                        // Skip zero sized
                        continue
                    }
    
                    r := workerResult{
                        Filename: fpath,
                    }
    
                    log.Printf(`sent result: %#v`, r.Filename)
                    results <- r
                } else if file.IsDir() {
                    // Send directory to be processed by the worker
                    nj := workerJob{
                        Root: fpath,
                    }
                    log.Printf(`sent new dir job: %#v`, nj.Root)
                    jobs <- nj
                }
            }
    
            done <- true
        }
    }
    
    func main() {
        dir := `/tmp`
    
        workerCount := 1
    
        jobs := make(chan workerJob, workerCount)
        results := make(chan workerResult)
        readDone := make(chan bool)
    
        // start N workers
        for i := 0; i < workerCount; i++ {
            go worker(jobs, results, readDone)
        }
    
        jobs <- workerJob{
            Root: dir,
        }
    
        readloop:
        for {
            select {
            case res := <-results:
                log.Printf(`result=%#v`, res.Filename)
            case _ = <-readDone:
                log.Printf(`got stop`)
                break readloop
            }
        }
    
    }
    

    这将导致:

    2018/07/12 14:37:29 Directory: "/tmp"
    2018/07/12 14:37:29 sent result: "/tmp/.bashrc"
    2018/07/12 14:37:29 result="/tmp/.bashrc"
    2018/07/12 14:37:29 sent result: "/tmp/.bash_profile"
    2018/07/12 14:37:29 result="/tmp/.bash_profile"
    2018/07/12 14:37:29 sent result: "/tmp/.bash_logout"
    2018/07/12 14:37:29 result="/tmp/.bash_logout"
    2018/07/12 14:37:29 sent result: "/tmp/.xinitrc"
    2018/07/12 14:37:29 result="/tmp/.xinitrc"
    2018/07/12 14:37:29 sent new dir job: "/tmp/.config"
    fatal error: all goroutines are asleep - deadlock!
    
    goroutine 1 [select]:
    main.main()
        +0x281
    
    goroutine 5 [chan send]:
    main.worker(0xc42005a060, 0xc420078060, 0xc4200780c0)
        +0x4e7
    created by main.main
        +0x109
    
    Process finished with exit code 2
    

    如何解决僵局?

    2 回复  |  直到 6 年前
        1
  •  2
  •   leaf bebop    6 年前

    你已经注意到了 jobs <- nj 永远挂着。这是因为该操作将阻塞,直到工作进程在 range 循环,只要它在那里阻塞,它就不能到达 范围 循环。

    为了解决这个问题,你需要一个新的goroutine。

    go func() {
            jobs <- nj
    }()
    

    还有一个问题:你的 readDone 频道。

    目前,每次 worker 完成一项工作,这就有可能( select 随机选择准备好的频道) 选择 在里面 func main() 拿起它,然后关闭系统,这会导致所有剩余的作业和结果丢失。

    要解决这部分问题,您应该使用 sync.WaitGroup 是的。每次你增加一份新工作,你都会打电话给 wg.Add(1) 每次你的员工完成工作,你都会打电话给 wg.Done() 是的。在 函数main() ,您将生成一个使用 wg.Wait() 等待所有作业完成,然后使用 已读 是的。

    // One initial job
    wg.Add(1)
    go func() {
        jobs <- workerJob{
            Root: dir,
        }
    }()
    
    // When all jobs finished, shutdown the system.
    go func() {
        wg.Wait()
        readDone <- true
    }()
    

    完整代码: https://play.golang.org/p/KzVxtflu1eU

        2
  •  1
  •   Nick Hunter    6 年前

    关于改进代码的初步意见

    蒂姆的评论似乎没有触及要点。你在节目结束时关闭频道应该没关系 main() ,你的 select 语句有 default 案例。如果频道上有消息,将运行频道读取案例。

    这可以被认为是一个问题,当没有消息时,您将通过 违约 会导致CPU使用率激增的情况(“忙等待”),所以是的,可能只是删除默认情况。

    您还可以添加一个“停止”频道的情况,该频道会中断 for 循环,使用标签(这是必需的,否则 break 只要断开select语句,我们就可以再次循环):

    readloop:
    for {
        select {
        case res := <-results:
            log.Printf(`result=%#v`, res.Filename)
        case _ = <-stopChan:
            break readloop
    }
    

    最后,您可能还应该重命名变量 f 在里面 worker() dir ,因为它是一个目录而不是一个文件。只是让代码更容易阅读。对于一个精通自然语言的程序员来说,代码应该读起来几乎和自然语言一样。这样,这个声明,

    fpath := filepath.Join(f.Name(), file.Name())
    

    变成

    fpath := filepath.Join(dir.Name(), file.Name())
    

    …这对你的眼睛/大脑来说更容易扫描。

    为什么你的代码被破坏了

    你有一个频道死锁。你没有注意到因为 违约 case的意思是,从技术上讲,一个goroutine总是可以取得“进步”。否则运行时将引发恐慌,说:

    fatal error: all goroutines are asleep - deadlock!
    

    这是因为 工人() 具有以下结构:

    receive from channel
    ...
        ...
        foreach dir in root:
            send to channel
        ...
    ...
    

    但在正常信道上,发送和接收都是阻塞操作。发送/接收的goroutine在其伙伴出现之前不会取得进展。

    可以使用缓冲通道来避免这种情况,但不可能事先知道目录中会找到多少目录,因此缓冲区可能太小。我建议生成一个goroutine,这样它可以在不影响整体 工人() 循环:

    go func() {
        for _, file := range fInfo {
            ...
        }
    }()