Golang 并发模型:Pipelines

参考

https://pandaychen.github.io/2020/04/09/A-GOLANG-PIPELINE/

https://go.dev/blog/pipelines

代码

package main

import (
	"crypto/md5"
	"fmt"
	"log"
	"os"
	"path/filepath"
)

type result struct {
	path string
	sum  [md5.Size]byte
}

type fileData struct {
	path string
	data []byte
}

func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) {
	out := make(chan string, 0)
	errc := make(chan error, 1)

	go func() {
		defer close(out)
		defer close(errc)
		err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
			if err != nil {
				return err
			}
			if !info.Mode().IsRegular() {
				return nil
			}
			select {
			case out <- path:
				return nil
			case <-done:
				return fmt.Errorf("walk cancelled")
			}
		})
		errc <- err
	}()

	return out, errc
}

func readFiles(done <-chan struct{}, paths <-chan string) (<-chan fileData, <-chan error) {
	out := make(chan fileData, 100)
	errc := make(chan error, 1)

	go func() {
		defer close(out)
		defer close(errc)
		for {
			select {
			case <-done:
				return
			case path, ok := <-paths:
				if !ok {
					return
				}
				data, err := os.ReadFile(path)
				if err != nil {
					errc <- err
					return
				}
				select {
				case out <- fileData{path, data}:
				case <-done:
					return
				}
			}
		}
	}()

	return out, errc
}

func md5Files(done <-chan struct{}, fileDatas <-chan fileData) (<-chan result, <-chan error) {
	out := make(chan result, 100)
	errc := make(chan error, 1)

	go func() {
		defer close(out)
		defer close(errc)
		for {
			select {
			case <-done:
				return
			case fd, ok := <-fileDatas:
				if !ok {
					return
				}
				sum := md5.Sum(fd.data)
				select {
				case out <- result{fd.path, sum}:
				case <-done:
					return
				}
			}
		}
	}()

	return out, errc
}

func MD5All(root string) (map[string][md5.Size]byte, error) {
	done := make(chan struct{})
	defer close(done)

	paths, walkErrc := walkFiles(done, root)
	files, readErrc := readFiles(done, paths)
	results, hashErrc := md5Files(done, files)

	m := make(map[string][md5.Size]byte)

	for {
		select {
		case r, ok := <-results:
			if !ok {
				results = nil
			} else {
				m[r.path] = r.sum
			}
		case err, ok := <-walkErrc:
			if ok && err != nil {
				return nil, fmt.Errorf("walk error: %w", err)
			}
			walkErrc = nil
		case err, ok := <-readErrc:
			if ok && err != nil {
				return nil, fmt.Errorf("read error: %w", err)
			}
			readErrc = nil
		case err, ok := <-hashErrc:
			if ok && err != nil {
				return nil, fmt.Errorf("hash error: %w", err)
			}
			hashErrc = nil
		}

		if results == nil && walkErrc == nil && readErrc == nil && hashErrc == nil {
			break
		}
	}

	return m, nil
}

func main() {
	root := "."
	res, err := MD5All(root)
	if err != nil {
		log.Fatalf("失败: %v", err)
	}
	for path, sum := range res {
		fmt.Printf("%s: %x\n", path, sum)
	}
}


#GO#
全部评论

相关推荐

评论
点赞
4
分享

创作者周榜

更多
牛客网
牛客企业服务