简析go语言并发模式(3)
基于管道模式的扩展模式:
1.扇出模式:在某个处理环节,多个功能相同的goroutine从同一个channel读取数据并处理,直到该channel关闭。使用扇出模式可以在一组goroutine中均衡分配工作量,从而更均衡的利用CPU。
2.扇入模式:在某个处理环节,处理程序面对不止一个输入channel,把所有输入channel汇聚到一个统一的输入channel,然后处理程序从这个channel中读取数据并处理,直到该channel因所有输入channel关闭而关闭。
import ( "fmt" "sync" ) func newNumGenerator(start, stop int) <-chan int { c := make(chan int) go func() { for i := start; i <= stop; i++ { c <- i } close(c) }() return c } func filterOdd(in int) (int, bool) { if in%2 != 0 { return in, true } return 0, false } func square(in int) (int, bool) { return in * in, true } func spawnGroup(num int, f func(int) (int, bool), in <-chan int) <-chan int { groupOut := make(chan int) var outSlice []chan int // 创建多个goroutine从同一个channel in中读取数据,并将读取到的数据存放在outSlice中:扇出模式 for i := 0; i < num; i++ { out := make(chan int) go func(i int) { for v := range in { r, ok := f(v) if ok { out <- r } } close(out) }(i) outSlice = append(outSlice, out) } // 将outSlice中的多个channel汇聚到一个channel groupOut中:扇入模式 go func() { var wg sync.WaitGroup for _, out := range outSlice { wg.Add(1) go func(out chan int) { for v := range out { groupOut <- v } wg.Done() }(out) } wg.Wait() close(groupOut) }() return groupOut } func main() { in := newNumGenerator(0, 20) out := spawnGroup(3, square, spawnGroup(3, filterOdd, in)) for i := range out { fmt.Println(i) } } go run go-concurrency-pattern.go 1 9 81 25 49 121 225 169 361 289
Go语言基础及实战 文章被收录于专栏
Go语言学习笔记、语法知识、技术要点和个人理解及实战