我正在尝试优化一个CSV加载过程,基本上是在一个大的CSV文件中执行regex搜索(+4GB-31033993个记录用于我的实验)
我设法构建了一个多处理逻辑来读取CSV,但是当我使用
pprof
以下是我目前为止的代码:
package main
import (
"bufio"
"flag"
"fmt"
"log"
"os"
"regexp"
"runtime"
"runtime/pprof"
"strings"
"sync"
)
func processFile(path string) [][]string {
file, err := os.Open(path)
if err != nil {
log.Println("Error:", err)
}
var pattern = regexp.MustCompile(`^.*foo.*$`)
numCPU := runtime.NumCPU()
jobs := make(chan string, numCPU+1)
fmt.Printf("Strategy: Parallel, %d Workers ...\n", numCPU)
results := make(chan []string)
wg := new(sync.WaitGroup)
for w := 1; w <= numCPU; w++ {
wg.Add(1)
go parseRecord(jobs, results, wg, pattern)
}
go func() {
scanner := bufio.NewScanner(file)
for scanner.Scan() {
jobs <- scanner.Text()
}
close(jobs)
}()
go func() {
wg.Wait()
close(results)
}()
lines := [][]string{}
for line := range results {
lines = append(lines, line)
}
return lines
}
func parseRecord(jobs <-chan string, results chan<- []string, wg *sync.WaitGroup, pattern *regexp.Regexp) {
defer wg.Done()
for j := range jobs {
if pattern.MatchString(j) {
x := strings.Split(string(j), "\n")
results <- x
}
}
}
func split(r rune) bool {
return r == ','
}
func main() {
f, err := os.Create("perf.data")
if err != nil {
log.Fatal(err)
}
pprof.StartCPUProfile(f)
defer pprof.StopCPUProfile()
pathFlag := flag.String("file", "", `The CSV file to operate on.`)
flag.Parse()
lines := processFile(*pathFlag)
fmt.Println("loaded", len(lines), "records")
}
当我在没有任何regex约束的情况下处理文件时,我得到了一个合理的计算时间(我只是简单地将解析后的字符串加载到2D数组中,而没有任何限制)
pattern.MatchString()
)
Strategy: Parallel, 8 Workers ...
loaded 31033993 records
2018/10/09 11:46:38 readLines took 30.611246035s
Strategy: Parallel, 8 Workers ...
loaded 143090 records
2018/10/09 12:04:32 readLines took 1m24.029830907s