简析go语言并发模式
go语言实现了CSP并发模型:输入/输出为基本编程原语,数据处理逻辑(CSP模型中的P)仅需调用输入原语获取数据,顺序处理数据并将结果通过输出原语输出。Go引入了goroutine(P)之间的通信原语channel,goroutine可以从channel获取输入数据,再将处理后的数据结果通过channel输出。通过channel和goroutine组合连接在一起,使并发程序设计更为清晰,无须再为传统共享内存并发模型中的问题花费心智。引入select用于应对多路输入/输出,可以让goroutine同时协调处理多个channel操作。
Go语言也支持传统的基于共享内存的并发模型,并提供了基本的低级同步原语:主要是sync包中的互斥锁、条件变量、读写锁、原子操作等。但Go始终推荐以CSP模型风格构建并发程序,CSP模型风格go并发程序的并发模式大致分以下几种:
1.创建模式:使用go关键字+函数/方法创建一个goroutine,并返回一个channel类型变量的函数。这是Go中最常见的goroutine创建模式。
type T struct{}
func spawn(f func()) chan T {
c := make(chan T)
go func() {
f()
c <- T{}
}()
return c
}
func main() {
c := spawn(func())
<-c
}
spawn函数创建新的goroutine与调用spawn函数的goroutine之间通过一个channel建立联系,即两个goroutine通过channel进行通信。channel可以像变量一样被初始化、传递和赋值。
2.退出模式:goroutine的使用代价很低,官方推荐使用goroutine。在大多数情况下,我们无需考虑对goroutine的退出进行控制,goroutine的执行函数返回,就意味着goroutine的退出。但一些常驻后台服务程序可能会对goroutine有着优雅退出的要求。goroutine的退出模式大致分以下几种:
1)分离模式:分离模式是使用最广泛的goroutine退出模式。对于分离的goroutine,创建它的goroutine不需要关心它的退出,这类goroutine在创建后即与其创建者彻底分离,其生命周期与其执行的主函数相关,函数返回即goroutine退出。这类goroutine有如下两个常见用途。
a.一次性任务:新创建的goroutine用来执行一个简单任务,执行后退出。
import (
"fmt"
"time"
)
func worker(args ...interface{}) {
if len(args) == 0 {
return
}
interval, ok := args[0].(int)
if !ok {
return
}
time.Sleep(time.Second * time.Duration(interval))
}
func spawn(f func(args ...interface{}), args ...interface{}) {
go func() {
f(args...)
}()
}
func main() {
spawn(worker, 5)
}
b.常驻任务:常驻后台执行一些特定任务,如监视(monitor)、观察(watch)等。其实现通常采用for {...}或者for {select{...}}代码段形式,并多以定时器(timer)或事件(event)驱动执行。
import (
"fmt"
"time"
)
func worker(args ...interface{}) {
if len(args) == 0 {
fmt.Println("invalid args")
return
}
interval, ok := args[0].(int)
if !ok {
println("invalid interval args")
return
}
time.Sleep(time.Second * time.Duration(interval))
}
func spawn(f func(args ...interface{}), args ...interface{}) {
for {
go func() {
f(args...)
}()
}
}
func main() {
spawn(worker, 5)
}
2)join模式:goroutine的创建者等待新创建的goroutine结束。
a.等待一个goroutine退出
func worker(args ...interface{}) {
if len(args) == 0 {
return
}
interval, _ := args[0].(int)
time.Sleep(time.Second * time.Duration(interval))
}
func spawn(f func(args ...interface{}), args ...interface{}) chan struct{} {
c := make(chan struct{})
go func() {
f(args...)
c <- struct{}{}
time.Sleep(time.Second * 3) // 此处是为了说明向channel发送数据后,main goroutine是立刻感知还是等spawn函数返回后才感知
fmt.Println("worker go routine end")
}()
return c
}
func main() { // main goroutine
c := spawn(worker, 5) // 调用spawn,spawn函数创建了一个新的goroutine
fmt.Println("main goroutine start")
<-c
fmt.Println("main goroutine end")
time.Sleep(time.Second * 5) // 等待新的goroutine执行完成
}
$ go run wait-one-goroutine-demo.go
main goroutine start
main goroutine end
worker go routine end // 从结果可以看出,向channel发送信号是在新的goroutine退出前,新的goroutine发送信号后main goroutine立刻感知到接触阻塞执行了main函数体剩余的代码,而不是等待新的goroutine执行完毕main goroutine才感知到。
spawn函数使用典型的goroutine创建模式创建了一个goroutine,main goroutine作为创建者通过spawn函数返回的channel与新goroutine建立联系,这个channel的用途就是在两个goroutine之间建立退出事件的”信号“通信机制。main goroutine在创建完新的goroutine后便在该channel上阻塞等待,直到新的goroutine退出前向该channel发送一个信号。
b.获取goroutine的退出状态:新的goroutine创建者不仅要等待goroutine的退出,还要精准获取其结束状态。
import (
"errors"
"fmt"
"time"
)
var OK = errors.New("ok")
func worker(args ...interface{}) error {
if len(args) == 0 {
return errors.New("invalid args")
}
interval, ok := args[0].(int)
if !ok {
return errors.New("invalid interval args")
}
time.Sleep(time.Second * time.Duration(interval))
return OK
}
func spawn(f func(args ...interface{}) error, args ...interface{}) chan error {
c := make(chan error)
go func() {
c <- f(args...)
}()
return c
}
func main() {
done := spawn(worker, 5)
fmt.Println("worker1 start")
err := <-done
fmt.Println("worker1 done:", err)
done = spawn(worker, "a")
fmt.Println("worker2 start")
err = <-done
fmt.Println("worker2 done:", err)
}
$ go run wait-and-get-exit-code-one-goroutine-demo.go
worker1 start
worker1 done: ok
worker2 start
worker2 done: invalid interval args
上例中将channel承载的类型由struct{}改为error,这样channel承载的信息不仅仅是一个信号,还携带了有价值的信息:新goroutine的结束状态。
c.等待多个goroutine退出:在某些场景中,goroutine创建者可能需要等待多个goroutine退出。可以通过Go语言提供的sync.WaitGroup实现等待多个goroutine退出
import (
"fmt"
"sync"
"time"
)
func worker(args ...interface{}) {
if len(args) == 0 {
return
}
interval, ok := args[0].(int)
if !ok {
return
}
time.Sleep(time.Second * time.Duration(interval))
}
func spawnGroup(n int, f func(args ...interface{}), args ...interface{}) chan struct{} {
c := make(chan struct{})
var wg sync.WaitGroup
for i := 0; i < n; i++ {
wg.Add(1)
go func(i int) {
name := fmt.Sprintf("worker-%d", i)
f(args...)
println(name, "done")
wg.Done()
}(i)
}
go func() {
wg.Wait()
c <- struct{}{}
}()
return c
}
func main() {
fmt.Println("spawn a group of workers")
done := spawnGroup(5, worker, 3)
<-done
fmt.Println("group workers done")
}
$ go run wait-multiple-goroutine-exit-demo.go
spawn a group of workers
worker-1 done
worker-3 done
worker-4 done
worker-0 done
worker-2 done
group workers done
上例中通过sync.WaitGroup,spawnGorup每创建一个goroutine都会调用wg.Add(1),新创建的goroutine会在退出前调用wg.Done。在spwanGroup中还创建了一个用于监视的goroutine,该goroutine调用wg.Wait方法等待所有goroutine退出。在所有新创建的goroutine退出后,wg.Wait方法返回,并向c这个channel中写入一个信号,这是main goroutine会从阻塞在channel上的状态中恢复,继续往下执行。
d.超时等待机制:有时不需要无限等待所有新创建的goroutine退出,而是仅等待一段合理的时间。如果在这段时间内goroutine没有退出,则创建者会继续往下执行或者主动退出。
func main() {
timer := time.NewTimer(time.Second * 5)
done := spawnGroup(5, worker, 10)
defer timer.Stop()
fmt.Println("spawn a group of workers")
select {
case <-timer.C:
println("wait group workers exit timeout")
case <-done:
println("group workers done")
}
}
$ go run wait-multiple-goroutine-exit-with-timeout-demo.go
spawn a group of workers
wait group workers exit timeout
上例中通过创建一个定时器(time.Timer)设置超时等待时间,通过select监听timer.C和done两个channel,哪个先返回就执行哪个case分支。
3)notify-and-wait模式:前面的例子中都是goroutine创建者被动等待新goroutine退出,但是更多实现goroutine创建者需要主动通知新goroutine退出,尤其是当main作为goroutine的时候。main goroutine的退出意味着Go程序的终止,而粗暴地直接让main goroutine退出的方式可能导致业务数据损坏、不完整或丢失。我们可以通过notify-and-wait这一模式来满足这一要求。
a.通知并等待一个goroutine退出
import (
"time"
)
func worker(m int) {
time.Sleep(time.Second * time.Duration(m))
}
func spawn(f func(int)) chan string {
quit := make(chan string)
go func() {
job := make(chan int)
select {
case n := <-job:
f(n)
case status := <-quit:
quit <- status
}
}()
return quit
}
func main() {
quit := spawn(worker)
println("work goroutine start")
time.Sleep(time.Second * 5)
println("notify the work to exit")
quit <- "exit"
timer := time.NewTimer(time.Second * 10)
defer timer.Stop()
select {
case status := <-quit:
println("worker done:", status)
case <-timer.C:
println("wait worker exit timeout")
}
}
$ go run notify-and-wait-one-goroutine-exit-demo.go
work goroutine start
notify the work to exit
worker done: exit
上例中使用创建模式创建goroutine的spawn函数返回的channel成了一个双向数据通道:既承载创建者发送给新goroutine的退出信号,也承载新goroutine返回给创建者的退出状态。
b.通知并等待多个goroutine退出:当使用close函数关闭channel时,所有阻塞在该channel上的goroutine都会得到通知,这也是Go语言channel的一个特性。我们可以利用这一特性实现通知并等待多个goroutine退出。
import (
"fmt"
"sync"
"time"
)
func worker(m int) {
fmt.Printf("In worker-%d", m)
time.Sleep(time.Second * (time.Duration(m)))
}
func spawnGroup(n int, f func(int)) chan struct{} {
quit := make(chan struct{})
job := make(chan int)
var wg sync.WaitGroup
for i := 0; i < n; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
name := fmt.Sprintf("worker-%d", i)
j, ok := <-job // j, ok会阻塞在这里,直到job这个channel被close函数关闭
if !ok {
println(name, "done")
return
}
f(j)
}(i)
}
go func() {
<-quit
close(job)
wg.Wait()
quit <- struct{}{}
}()
return quit
}
func main() {
quit := spawnGroup(5, worker)
println("spawn a group of workers")
time.Sleep(time.Second * 5)
println("notify the group worker to exit...")
quit <- struct{}{}
time.Sleep(time.Second * 5)
timer := time.NewTimer(time.Second * 10)
defer timer.Stop()
select {
case <-quit:
println("group workers done")
case <-timer.C:
println("wait group workers exit timeout")
}
}
$ go run notify-and-wait-group-goroutines-exit-demo.go
spawn a group of workers
notify the group worker to exit...
worker-4 done
worker-0 done
worker-2 done
worker-3 done
worker-1 done
group workers done
上例利用了worker goroutine接收任务的channel来广播退出通知,而实现这一广播就是close(job)。此时各个worker goroutine监听job channel,当创建者关闭job channel时,通过comma ok惯用法获取ok值为fasle,worker goroutine执行退出模式。
Go语言学习笔记、语法知识、技术要点和个人理解及实战
查看3道真题和解析