Golang 并发模型:Pipelines
参考
https://pandaychen.github.io/2020/04/09/A-GOLANG-PIPELINE/
https://go.dev/blog/pipelines
代码
package main import ( "crypto/md5" "fmt" "log" "os" "path/filepath" ) type result struct { path string sum [md5.Size]byte } type fileData struct { path string data []byte } func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) { out := make(chan string, 0) errc := make(chan error, 1) go func() { defer close(out) defer close(errc) err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error { if err != nil { return err } if !info.Mode().IsRegular() { return nil } select { case out <- path: return nil case <-done: return fmt.Errorf("walk cancelled") } }) errc <- err }() return out, errc } func readFiles(done <-chan struct{}, paths <-chan string) (<-chan fileData, <-chan error) { out := make(chan fileData, 100) errc := make(chan error, 1) go func() { defer close(out) defer close(errc) for { select { case <-done: return case path, ok := <-paths: if !ok { return } data, err := os.ReadFile(path) if err != nil { errc <- err return } select { case out <- fileData{path, data}: case <-done: return } } } }() return out, errc } func md5Files(done <-chan struct{}, fileDatas <-chan fileData) (<-chan result, <-chan error) { out := make(chan result, 100) errc := make(chan error, 1) go func() { defer close(out) defer close(errc) for { select { case <-done: return case fd, ok := <-fileDatas: if !ok { return } sum := md5.Sum(fd.data) select { case out <- result{fd.path, sum}: case <-done: return } } } }() return out, errc } func MD5All(root string) (map[string][md5.Size]byte, error) { done := make(chan struct{}) defer close(done) paths, walkErrc := walkFiles(done, root) files, readErrc := readFiles(done, paths) results, hashErrc := md5Files(done, files) m := make(map[string][md5.Size]byte) for { select { case r, ok := <-results: if !ok { results = nil } else { m[r.path] = r.sum } case err, ok := <-walkErrc: if ok && err != nil { return nil, fmt.Errorf("walk error: %w", err) } walkErrc = nil case err, ok := <-readErrc: if ok && err != nil { return nil, fmt.Errorf("read error: %w", err) } readErrc = nil case err, ok := <-hashErrc: if ok && err != nil { return nil, fmt.Errorf("hash error: %w", err) } hashErrc = nil } if results == nil && walkErrc == nil && readErrc == nil && hashErrc == nil { break } } return m, nil } func main() { root := "." res, err := MD5All(root) if err != nil { log.Fatalf("失败: %v", err) } for path, sum := range res { fmt.Printf("%s: %x\n", path, sum) } }#GO#