简析go语言并发模式(2)
4)管道模式:多个数据处理环节,每个数据处理环节都由一组功能相同的goroutine完成,每个数据处理环节的goroutine都要从数据输入channel获取上一个数据处理环节生产的数据,然后对这些数据进行处理并将处理后的结果通过数据输出channel发往下一个环节。
import "fmt" func generatorNums(start, stop int) <-chan int { c := make(chan int) go func() { for i := start; i <= stop; i++ { c <- i } close(c) }() return c } func filterEven(n int) (int, bool) { if n%2 != 0 { return 0, false } return n, true } func spawn(f func(n int) (int, bool), in <-chan int) <-chan int { out := make(chan int) go func() { for v := range in { r, ok := f(v) if ok { out <- r } } close(out) }() return out } func main() { in := generatorNums(0, 10) out := spawn(filterEven, in) for v := range out { fmt.Println(v) } } go run go-concurrency-by-channel-pattern.go 0 2 4 6 8 10
上述例子为过滤出一组整数中的偶数,共有三个处理环节:
1.第一个环节生产数据序列,这个序列由generatorNums创建的goroutine负责生成并发送到输出channel中,在序列全部发送完毕后该goroutine关闭channel并退出。
2.第二个环节是过滤偶数,由spawn函数创建的goroutine从第一个环节的输出channel中读取数据,并交由filterEven函数处理,如果是偶数,则发送到该goroutine的输出channel中,在全部数据处理并发送完毕后,该goroutine关闭channel并退出。
3.第三个环节是将序列数据输出到控制台,main goroutine从第二个环节的输出channel中读取数据,并将数据打印输出到控制台上,在全部数据处理完后main goroutine退出。
管道模式具有良好的可扩展性,如果在上例基础上增加对偶数进行平方运算(square),可以向下面这样扩展管道:
import "fmt" func generatorNums(start, stop int) <-chan int { c := make(chan int) go func() { for i := start; i <= stop; i++ { c <- i } close(c) }() return c } func filterEven(n int) (int, bool) { if n%2 != 0 { return 0, false } return n, true } func square(n int) (int, bool) { return n * n, true } func spawn(f func(n int) (int, bool), in <-chan int) <-chan int { out := make(chan int) go func() { for v := range in { r, ok := f(v) if ok { out <- r } } close(out) }() return out } func main() { in := generatorNums(0, 10) out := spawn(square, spawn(filterEven, in)) for v := range out { fmt.Println(v) } } go run go-concurrency-by-channel-pattern.go 0 2 4 6 8 10
Go语言基础及实战 文章被收录于专栏
Go语言学习笔记、语法知识、技术要点和个人理解及实战