简析go语言并发

传统编程语言中,并发设计多以操作系统线程作为承载模块的基本执行单元,由操作系统执行调度。操作系统线程的创建、销毁及线程间上下文切换的代价较大,且线程间通信原语复杂。go语言中,实现了goroutine这一由go运行时负责调度的用户层轻量级线程为并发设计提供原生支持。goroutine相比传统操作系统线程具有如下优势:

①资源占用小,每个goroutine初始栈大小仅为2kb

②由Go运行时而不是操作系统调度,goroutine上下文切换代价小

③语言原生支持:由go关键字接函数或方法创建,函数或方法返回即表示goroutine退出,开发体验更佳

④语言内置channel作为goroutine间通信原语,为并发设计提供强大支撑。创建一个channel, make(chan TYPE size),TYPE指的是channel中传输的数据类型,第二个参数是可选的,指的是channel的缓冲区大小。向channel传入数据,CHAN <- DATA,CHAN指的是目的channel即收集数据的一方,DATA则是要传的数据。从channel读取数据,DATA := <-CHAN,和向channel传入数据相反,在数据输送箭头的右侧的是channel。

通过一个例子理解go语言并发:车站过安检的时候,顺序为先查验身份证是否是你本人,行李物品过X光检查,最后对你的身体检查后通关。假设每个步骤耗时分别为查验身份证1,行李X光检查5,身体检查3且车站工作人员和相关设备都为1,当前有100个人准备进站。

先来看顺序完成以上步骤的go语言实现:

import "time"

const (
	idCheck   = 1
	xRayCheck = 5
	bodyCheck = 3
)

func idCheckCost() int {
	time.Sleep(time.Millisecond * time.Duration(idCheck))
	return idCheck
}

func xRayCheckCost() int {
	time.Sleep(time.Microsecond * time.Duration(xRayCheck))
	return xRayCheck
}

func bodyCheckCost() int {
	time.Sleep(time.Microsecond * time.Duration(bodyCheck))
	return bodyCheck
}

func enterStationCost() int {
	total := 0

	total += idCheckCost()
	total += xRayCheckCost()
	total += bodyCheckCost()

	return total
}

func main() {
	totalTime := 0
	passengers := 100

	for i := 0; i < passengers; i++ {
		totalTime += enterStationCost()
	}

	println("total cost time:", totalTime)
}

$ go run enter-station-v1.go
total cost time: 900

以上方法进入车站较慢,随着进站人数增加,等待进站的队伍势必越来越长。车站为了减少旅客排队进站的等待时间,新增了2条安检通道并行处理旅客进站。再看此时旅客进站的go语言实现:

import "time"

const (
	idCheck   = 1
	xRayCheck = 5
	bodyCheck = 3
)

func idCheckCost() int {
	time.Sleep(time.Millisecond * time.Duration(idCheck))
	return idCheck
}

func xRayCheckCost() int {
	time.Sleep(time.Microsecond * time.Duration(xRayCheck))
	return xRayCheck
}

func bodyCheckCost() int {
	time.Sleep(time.Microsecond * time.Duration(bodyCheck))
	return bodyCheck
}

func enterStationCost() int {
	total := 0

	total += idCheckCost()
	total += xRayCheckCost()
	total += bodyCheckCost()

	return total
}

func start(f func() int, queue <-chan struct{}) <-chan int {
	c := make(chan int)

	go func() {
		total := 0
		for {
			_, ok := <-queue
			if !ok {
				c <- total
				return
			}
			total += f()
		}
	}()
	return c
}

func max(args ...int) int {
	n := 0
	for _, v := range args {
		if v > n {
			n = v
		}
	}
	return n
}

func main() {
	totalTime := 0
	passengers := 100

	c := make(chan struct{})
	c1 := start(enterStationCost, c)
	c2 := start(enterStationCost, c)
	c3 := start(enterStationCost, c)

	for i := 0; i < passengers; i++ {
		c <- struct{}{}
	}
	close(c)

	totalTime = max(<-c1, <-c2, <-c3)
	println("total cost time:", totalTime)
}

$ go run enter-station-v2.go
total cost time: 300

为了模拟该并行方案,创建了3个goroutine,分别代表三个安检通道,可以看到效率是原来的3倍(90->30)。但是原来的程序并并没改变,每个安检通道(goroutine)都干着原来的工作:enterStationCost。

该车站所在地为旅游城市,每到旅游旺季,在新增至3条安检通道的情况下排队等待进站的旅客依然很多,鉴于车站场地有限,不可能再新增安检通道,只能思考对现有的方案进行优化调整。

之前的程序弊端明显:当工作人员处于某个环节(如查看X光机)时其他环节便处于”等待“状态(因为没有相应的工作人员来处理——开头已经说明安检通道和工作人员都为1,只能等待工作人员完成查看X光机或身体检查或身份证检查)。显然,一种更高效的方式就是让所有环节(身份证检查,X光行李检查、人身检查)同时进行,就像流水线一样(流水线上各个工位同时工作,各个工位都有相应的工作人员完成本工位的相关工作),这就是并发。下面看并发方案的go语言实现

import "time"

const (
	idCheck   = 1
	xRayCheck = 5
	bodyCheck = 3
)

func idCheckCost() int {
	time.Sleep(time.Millisecond * time.Duration(idCheck))
	return idCheck
}

func xRayCheckCost() int {
	time.Sleep(time.Microsecond * time.Duration(xRayCheck))
	return xRayCheck
}

func bodyCheckCost() int {
	time.Sleep(time.Microsecond * time.Duration(bodyCheck))
	return bodyCheck
}

func enterStation(id string, queue <-chan struct{}) {
	go func(id string) {
		print("goroutine-", id, " enterStation check channel is ready...", "\n")
		// X光检查
		queue3, quit3, result3 := start(id, xRayCheckCost, nil)
		// 身体检查
		queue2, quit2, result2 := start(id, bodyCheckCost, queue3)
		// 身份检查
		queue1, quit1, result1 := start(id, idCheckCost, queue2)

		for {
			select {
			case v, ok := <-queue:
				if !ok {
					close(quit1)
					close(quit2)
					close(quit3)
					totalCost := max(<-result1, <-result2, <-result3)
					print("goroutine-", id, " enterStationChannel cost time: ", totalCost, "\n")
					return
				}
				queue1 <- v
			}
		}
	}(id)
}

func start(id string, f func() int, next chan<- struct{}) (chan<- struct{}, chan<- struct{}, chan int) {
	queue := make(chan struct{}, 10)
	quit := make(chan struct{})
	result := make(chan int)
	_ = id

	go func() {
		total := 0
		for {
			select {
			case <-quit:
				result <- total
				return
			case v := <-queue:
				total += f()
				if next != nil {
					next <- v
				}
			}
		}
	}()
	return queue, quit, result
}

func max(args ...int) int {
	n := 0
	for _, v := range args {
		if v > n {
			n = v
		}
	}
	return n
}

func main() {
	passengers := 100
	queue := make(chan struct{}, 10)
	enterStation("channel1", queue)
	enterStation("channel2", queue)
	enterStation("channel3", queue)

	time.Sleep(time.Second * 10)
	for i := 0; i < passengers; i++ {
		queue <- struct{}{}
	}
	time.Sleep(10 * time.Second)
	close(queue)

	time.Sleep(10 * time.Second) // 防止main goroutine退出
}

$ go run enter-station-v3.go
goroutine-channel1 enterStation check channel is ready...
goroutine-channel3 enterStation check channel is ready...
goroutine-channel2 enterStation check channel is ready...
goroutine-channel1 enterStationChannel cost time: 165
goroutine-channel3 enterStationChannel cost time: 165
goroutine-channel2 enterStationChannel cost time: 170

上述程序,模拟开启了三条通道(enterStation),每条通道创建三个goroutine分别负责处理身份检查、X光检查和身体检查,三个goroutine之间通过channel相连。从运行结果可以看到,100个人进站时长进一步下降到170,并发方案使得安检效率进一步提升。如果计算资源资源不足,并发方案的效率最差回退到文首顺序执行同等的水平。

Go语言基础及实战 文章被收录于专栏

Go语言学习笔记、语法知识、技术要点和个人理解及实战

全部评论

相关推荐

01-07 07:54
已编辑
门头沟学院 前端工程师
点赞 评论 收藏
分享
评论
点赞
收藏
分享

创作者周榜

更多
牛客网
牛客企业服务