简析go语言并发模式(3)

基于管道模式的扩展模式:

1.扇出模式:在某个处理环节,多个功能相同的goroutine从同一个channel读取数据并处理,直到该channel关闭。使用扇出模式可以在一组goroutine中均衡分配工作量,从而更均衡的利用CPU。

2.扇入模式:在某个处理环节,处理程序面对不止一个输入channel,把所有输入channel汇聚到一个统一的输入channel,然后处理程序从这个channel中读取数据并处理,直到该channel因所有输入channel关闭而关闭。

import (
	"fmt"
	"sync"
)

func newNumGenerator(start, stop int) <-chan int {
	c := make(chan int)

	go func() {
		for i := start; i <= stop; i++ {
			c <- i
		}
		close(c)
	}()
	return c
}

func filterOdd(in int) (int, bool) {
	if in%2 != 0 {
		return in, true
	}
	return 0, false
}

func square(in int) (int, bool) {
	return in * in, true
}

func spawnGroup(num int, f func(int) (int, bool), in <-chan int) <-chan int {
	groupOut := make(chan int)
	var outSlice []chan int
	
	// 创建多个goroutine从同一个channel in中读取数据,并将读取到的数据存放在outSlice中:扇出模式
	for i := 0; i < num; i++ {
		out := make(chan int)
		go func(i int) {
			for v := range in {
				r, ok := f(v)
				if ok {
					out <- r
				}
			}
			close(out)
		}(i)
		outSlice = append(outSlice, out)
	}
	
	// 将outSlice中的多个channel汇聚到一个channel groupOut中:扇入模式
	go func() {
		var wg sync.WaitGroup
		for _, out := range outSlice {
			wg.Add(1)
			go func(out chan int) {
				for v := range out {
					groupOut <- v
				}
				wg.Done()
			}(out)
		}
		wg.Wait()
		close(groupOut)
	}()
	return groupOut
}

func main() {
	in := newNumGenerator(0, 20)
	out := spawnGroup(3, square, spawnGroup(3, filterOdd, in))

	for i := range out {
		fmt.Println(i)
	}
}

go run go-concurrency-pattern.go
1
9
81
25
49
121
225
169
361
289

Go语言基础及实战 文章被收录于专栏

Go语言学习笔记、语法知识、技术要点和个人理解及实战

全部评论

相关推荐

字节 飞书绩效团队 (n+2) * 15 + 1k * 12 + 1w
点赞 评论 收藏
分享
点赞 评论 收藏
分享
11-01 08:48
门头沟学院 C++
伤心的候选人在吵架:佬你不要的,能不能拿户口本证明过户给我。。球球了
点赞 评论 收藏
分享
点赞 收藏 评论
分享
牛客网
牛客企业服务