简析go语言并发
传统编程语言中,并发设计多以操作系统线程作为承载模块的基本执行单元,由操作系统执行调度。操作系统线程的创建、销毁及线程间上下文切换的代价较大,且线程间通信原语复杂。go语言中,实现了goroutine这一由go运行时负责调度的用户层轻量级线程为并发设计提供原生支持。goroutine相比传统操作系统线程具有如下优势:
①资源占用小,每个goroutine初始栈大小仅为2kb
②由Go运行时而不是操作系统调度,goroutine上下文切换代价小
③语言原生支持:由go关键字接函数或方法创建,函数或方法返回即表示goroutine退出,开发体验更佳
④语言内置channel作为goroutine间通信原语,为并发设计提供强大支撑。创建一个channel, make(chan TYPE size),TYPE指的是channel中传输的数据类型,第二个参数是可选的,指的是channel的缓冲区大小。向channel传入数据,CHAN <- DATA,CHAN指的是目的channel即收集数据的一方,DATA则是要传的数据。从channel读取数据,DATA := <-CHAN,和向channel传入数据相反,在数据输送箭头的右侧的是channel。
通过一个例子理解go语言并发:车站过安检的时候,顺序为先查验身份证是否是你本人,行李物品过X光检查,最后对你的身体检查后通关。假设每个步骤耗时分别为查验身份证1,行李X光检查5,身体检查3且车站工作人员和相关设备都为1,当前有100个人准备进站。
先来看顺序完成以上步骤的go语言实现:
import "time" const ( idCheck = 1 xRayCheck = 5 bodyCheck = 3 ) func idCheckCost() int { time.Sleep(time.Millisecond * time.Duration(idCheck)) return idCheck } func xRayCheckCost() int { time.Sleep(time.Microsecond * time.Duration(xRayCheck)) return xRayCheck } func bodyCheckCost() int { time.Sleep(time.Microsecond * time.Duration(bodyCheck)) return bodyCheck } func enterStationCost() int { total := 0 total += idCheckCost() total += xRayCheckCost() total += bodyCheckCost() return total } func main() { totalTime := 0 passengers := 100 for i := 0; i < passengers; i++ { totalTime += enterStationCost() } println("total cost time:", totalTime) } $ go run enter-station-v1.go total cost time: 900
以上方法进入车站较慢,随着进站人数增加,等待进站的队伍势必越来越长。车站为了减少旅客排队进站的等待时间,新增了2条安检通道并行处理旅客进站。再看此时旅客进站的go语言实现:
import "time" const ( idCheck = 1 xRayCheck = 5 bodyCheck = 3 ) func idCheckCost() int { time.Sleep(time.Millisecond * time.Duration(idCheck)) return idCheck } func xRayCheckCost() int { time.Sleep(time.Microsecond * time.Duration(xRayCheck)) return xRayCheck } func bodyCheckCost() int { time.Sleep(time.Microsecond * time.Duration(bodyCheck)) return bodyCheck } func enterStationCost() int { total := 0 total += idCheckCost() total += xRayCheckCost() total += bodyCheckCost() return total } func start(f func() int, queue <-chan struct{}) <-chan int { c := make(chan int) go func() { total := 0 for { _, ok := <-queue if !ok { c <- total return } total += f() } }() return c } func max(args ...int) int { n := 0 for _, v := range args { if v > n { n = v } } return n } func main() { totalTime := 0 passengers := 100 c := make(chan struct{}) c1 := start(enterStationCost, c) c2 := start(enterStationCost, c) c3 := start(enterStationCost, c) for i := 0; i < passengers; i++ { c <- struct{}{} } close(c) totalTime = max(<-c1, <-c2, <-c3) println("total cost time:", totalTime) } $ go run enter-station-v2.go total cost time: 300
为了模拟该并行方案,创建了3个goroutine,分别代表三个安检通道,可以看到效率是原来的3倍(90->30)。但是原来的程序并并没改变,每个安检通道(goroutine)都干着原来的工作:enterStationCost。
该车站所在地为旅游城市,每到旅游旺季,在新增至3条安检通道的情况下排队等待进站的旅客依然很多,鉴于车站场地有限,不可能再新增安检通道,只能思考对现有的方案进行优化调整。
之前的程序弊端明显:当工作人员处于某个环节(如查看X光机)时其他环节便处于”等待“状态(因为没有相应的工作人员来处理——开头已经说明安检通道和工作人员都为1,只能等待工作人员完成查看X光机或身体检查或身份证检查)。显然,一种更高效的方式就是让所有环节(身份证检查,X光行李检查、人身检查)同时进行,就像流水线一样(流水线上各个工位同时工作,各个工位都有相应的工作人员完成本工位的相关工作),这就是并发。下面看并发方案的go语言实现
import "time" const ( idCheck = 1 xRayCheck = 5 bodyCheck = 3 ) func idCheckCost() int { time.Sleep(time.Millisecond * time.Duration(idCheck)) return idCheck } func xRayCheckCost() int { time.Sleep(time.Microsecond * time.Duration(xRayCheck)) return xRayCheck } func bodyCheckCost() int { time.Sleep(time.Microsecond * time.Duration(bodyCheck)) return bodyCheck } func enterStation(id string, queue <-chan struct{}) { go func(id string) { print("goroutine-", id, " enterStation check channel is ready...", "\n") // X光检查 queue3, quit3, result3 := start(id, xRayCheckCost, nil) // 身体检查 queue2, quit2, result2 := start(id, bodyCheckCost, queue3) // 身份检查 queue1, quit1, result1 := start(id, idCheckCost, queue2) for { select { case v, ok := <-queue: if !ok { close(quit1) close(quit2) close(quit3) totalCost := max(<-result1, <-result2, <-result3) print("goroutine-", id, " enterStationChannel cost time: ", totalCost, "\n") return } queue1 <- v } } }(id) } func start(id string, f func() int, next chan<- struct{}) (chan<- struct{}, chan<- struct{}, chan int) { queue := make(chan struct{}, 10) quit := make(chan struct{}) result := make(chan int) _ = id go func() { total := 0 for { select { case <-quit: result <- total return case v := <-queue: total += f() if next != nil { next <- v } } } }() return queue, quit, result } func max(args ...int) int { n := 0 for _, v := range args { if v > n { n = v } } return n } func main() { passengers := 100 queue := make(chan struct{}, 10) enterStation("channel1", queue) enterStation("channel2", queue) enterStation("channel3", queue) time.Sleep(time.Second * 10) for i := 0; i < passengers; i++ { queue <- struct{}{} } time.Sleep(10 * time.Second) close(queue) time.Sleep(10 * time.Second) // 防止main goroutine退出 } $ go run enter-station-v3.go goroutine-channel1 enterStation check channel is ready... goroutine-channel3 enterStation check channel is ready... goroutine-channel2 enterStation check channel is ready... goroutine-channel1 enterStationChannel cost time: 165 goroutine-channel3 enterStationChannel cost time: 165 goroutine-channel2 enterStationChannel cost time: 170
上述程序,模拟开启了三条通道(enterStation),每条通道创建三个goroutine分别负责处理身份检查、X光检查和身体检查,三个goroutine之间通过channel相连。从运行结果可以看到,100个人进站时长进一步下降到170,并发方案使得安检效率进一步提升。如果计算资源资源不足,并发方案的效率最差回退到文首顺序执行同等的水平。
Go语言学习笔记、语法知识、技术要点和个人理解及实战