简析go语言并发模式
go语言实现了CSP并发模型:输入/输出为基本编程原语,数据处理逻辑(CSP模型中的P)仅需调用输入原语获取数据,顺序处理数据并将结果通过输出原语输出。Go引入了goroutine(P)之间的通信原语channel,goroutine可以从channel获取输入数据,再将处理后的数据结果通过channel输出。通过channel和goroutine组合连接在一起,使并发程序设计更为清晰,无须再为传统共享内存并发模型中的问题花费心智。引入select用于应对多路输入/输出,可以让goroutine同时协调处理多个channel操作。
Go语言也支持传统的基于共享内存的并发模型,并提供了基本的低级同步原语:主要是sync包中的互斥锁、条件变量、读写锁、原子操作等。但Go始终推荐以CSP模型风格构建并发程序,CSP模型风格go并发程序的并发模式大致分以下几种:
1.创建模式:使用go关键字+函数/方法创建一个goroutine,并返回一个channel类型变量的函数。这是Go中最常见的goroutine创建模式。
type T struct{} func spawn(f func()) chan T { c := make(chan T) go func() { f() c <- T{} }() return c } func main() { c := spawn(func()) <-c }
spawn函数创建新的goroutine与调用spawn函数的goroutine之间通过一个channel建立联系,即两个goroutine通过channel进行通信。channel可以像变量一样被初始化、传递和赋值。
2.退出模式:goroutine的使用代价很低,官方推荐使用goroutine。在大多数情况下,我们无需考虑对goroutine的退出进行控制,goroutine的执行函数返回,就意味着goroutine的退出。但一些常驻后台服务程序可能会对goroutine有着优雅退出的要求。goroutine的退出模式大致分以下几种:
1)分离模式:分离模式是使用最广泛的goroutine退出模式。对于分离的goroutine,创建它的goroutine不需要关心它的退出,这类goroutine在创建后即与其创建者彻底分离,其生命周期与其执行的主函数相关,函数返回即goroutine退出。这类goroutine有如下两个常见用途。
a.一次性任务:新创建的goroutine用来执行一个简单任务,执行后退出。
import ( "fmt" "time" ) func worker(args ...interface{}) { if len(args) == 0 { return } interval, ok := args[0].(int) if !ok { return } time.Sleep(time.Second * time.Duration(interval)) } func spawn(f func(args ...interface{}), args ...interface{}) { go func() { f(args...) }() } func main() { spawn(worker, 5) }
b.常驻任务:常驻后台执行一些特定任务,如监视(monitor)、观察(watch)等。其实现通常采用for {...}或者for {select{...}}代码段形式,并多以定时器(timer)或事件(event)驱动执行。
import ( "fmt" "time" ) func worker(args ...interface{}) { if len(args) == 0 { fmt.Println("invalid args") return } interval, ok := args[0].(int) if !ok { println("invalid interval args") return } time.Sleep(time.Second * time.Duration(interval)) } func spawn(f func(args ...interface{}), args ...interface{}) { for { go func() { f(args...) }() } } func main() { spawn(worker, 5) }
2)join模式:goroutine的创建者等待新创建的goroutine结束。
a.等待一个goroutine退出
func worker(args ...interface{}) { if len(args) == 0 { return } interval, _ := args[0].(int) time.Sleep(time.Second * time.Duration(interval)) } func spawn(f func(args ...interface{}), args ...interface{}) chan struct{} { c := make(chan struct{}) go func() { f(args...) c <- struct{}{} time.Sleep(time.Second * 3) // 此处是为了说明向channel发送数据后,main goroutine是立刻感知还是等spawn函数返回后才感知 fmt.Println("worker go routine end") }() return c } func main() { // main goroutine c := spawn(worker, 5) // 调用spawn,spawn函数创建了一个新的goroutine fmt.Println("main goroutine start") <-c fmt.Println("main goroutine end") time.Sleep(time.Second * 5) // 等待新的goroutine执行完成 } $ go run wait-one-goroutine-demo.go main goroutine start main goroutine end worker go routine end // 从结果可以看出,向channel发送信号是在新的goroutine退出前,新的goroutine发送信号后main goroutine立刻感知到接触阻塞执行了main函数体剩余的代码,而不是等待新的goroutine执行完毕main goroutine才感知到。
spawn函数使用典型的goroutine创建模式创建了一个goroutine,main goroutine作为创建者通过spawn函数返回的channel与新goroutine建立联系,这个channel的用途就是在两个goroutine之间建立退出事件的”信号“通信机制。main goroutine在创建完新的goroutine后便在该channel上阻塞等待,直到新的goroutine退出前向该channel发送一个信号。
b.获取goroutine的退出状态:新的goroutine创建者不仅要等待goroutine的退出,还要精准获取其结束状态。
import ( "errors" "fmt" "time" ) var OK = errors.New("ok") func worker(args ...interface{}) error { if len(args) == 0 { return errors.New("invalid args") } interval, ok := args[0].(int) if !ok { return errors.New("invalid interval args") } time.Sleep(time.Second * time.Duration(interval)) return OK } func spawn(f func(args ...interface{}) error, args ...interface{}) chan error { c := make(chan error) go func() { c <- f(args...) }() return c } func main() { done := spawn(worker, 5) fmt.Println("worker1 start") err := <-done fmt.Println("worker1 done:", err) done = spawn(worker, "a") fmt.Println("worker2 start") err = <-done fmt.Println("worker2 done:", err) } $ go run wait-and-get-exit-code-one-goroutine-demo.go worker1 start worker1 done: ok worker2 start worker2 done: invalid interval args
上例中将channel承载的类型由struct{}改为error,这样channel承载的信息不仅仅是一个信号,还携带了有价值的信息:新goroutine的结束状态。
c.等待多个goroutine退出:在某些场景中,goroutine创建者可能需要等待多个goroutine退出。可以通过Go语言提供的sync.WaitGroup实现等待多个goroutine退出
import ( "fmt" "sync" "time" ) func worker(args ...interface{}) { if len(args) == 0 { return } interval, ok := args[0].(int) if !ok { return } time.Sleep(time.Second * time.Duration(interval)) } func spawnGroup(n int, f func(args ...interface{}), args ...interface{}) chan struct{} { c := make(chan struct{}) var wg sync.WaitGroup for i := 0; i < n; i++ { wg.Add(1) go func(i int) { name := fmt.Sprintf("worker-%d", i) f(args...) println(name, "done") wg.Done() }(i) } go func() { wg.Wait() c <- struct{}{} }() return c } func main() { fmt.Println("spawn a group of workers") done := spawnGroup(5, worker, 3) <-done fmt.Println("group workers done") } $ go run wait-multiple-goroutine-exit-demo.go spawn a group of workers worker-1 done worker-3 done worker-4 done worker-0 done worker-2 done group workers done
上例中通过sync.WaitGroup,spawnGorup每创建一个goroutine都会调用wg.Add(1),新创建的goroutine会在退出前调用wg.Done。在spwanGroup中还创建了一个用于监视的goroutine,该goroutine调用wg.Wait方法等待所有goroutine退出。在所有新创建的goroutine退出后,wg.Wait方法返回,并向c这个channel中写入一个信号,这是main goroutine会从阻塞在channel上的状态中恢复,继续往下执行。
d.超时等待机制:有时不需要无限等待所有新创建的goroutine退出,而是仅等待一段合理的时间。如果在这段时间内goroutine没有退出,则创建者会继续往下执行或者主动退出。
func main() { timer := time.NewTimer(time.Second * 5) done := spawnGroup(5, worker, 10) defer timer.Stop() fmt.Println("spawn a group of workers") select { case <-timer.C: println("wait group workers exit timeout") case <-done: println("group workers done") } } $ go run wait-multiple-goroutine-exit-with-timeout-demo.go spawn a group of workers wait group workers exit timeout
上例中通过创建一个定时器(time.Timer)设置超时等待时间,通过select监听timer.C和done两个channel,哪个先返回就执行哪个case分支。
3)notify-and-wait模式:前面的例子中都是goroutine创建者被动等待新goroutine退出,但是更多实现goroutine创建者需要主动通知新goroutine退出,尤其是当main作为goroutine的时候。main goroutine的退出意味着Go程序的终止,而粗暴地直接让main goroutine退出的方式可能导致业务数据损坏、不完整或丢失。我们可以通过notify-and-wait这一模式来满足这一要求。
a.通知并等待一个goroutine退出
import ( "time" ) func worker(m int) { time.Sleep(time.Second * time.Duration(m)) } func spawn(f func(int)) chan string { quit := make(chan string) go func() { job := make(chan int) select { case n := <-job: f(n) case status := <-quit: quit <- status } }() return quit } func main() { quit := spawn(worker) println("work goroutine start") time.Sleep(time.Second * 5) println("notify the work to exit") quit <- "exit" timer := time.NewTimer(time.Second * 10) defer timer.Stop() select { case status := <-quit: println("worker done:", status) case <-timer.C: println("wait worker exit timeout") } } $ go run notify-and-wait-one-goroutine-exit-demo.go work goroutine start notify the work to exit worker done: exit
上例中使用创建模式创建goroutine的spawn函数返回的channel成了一个双向数据通道:既承载创建者发送给新goroutine的退出信号,也承载新goroutine返回给创建者的退出状态。
b.通知并等待多个goroutine退出:当使用close函数关闭channel时,所有阻塞在该channel上的goroutine都会得到通知,这也是Go语言channel的一个特性。我们可以利用这一特性实现通知并等待多个goroutine退出。
import ( "fmt" "sync" "time" ) func worker(m int) { fmt.Printf("In worker-%d", m) time.Sleep(time.Second * (time.Duration(m))) } func spawnGroup(n int, f func(int)) chan struct{} { quit := make(chan struct{}) job := make(chan int) var wg sync.WaitGroup for i := 0; i < n; i++ { wg.Add(1) go func(i int) { defer wg.Done() name := fmt.Sprintf("worker-%d", i) j, ok := <-job // j, ok会阻塞在这里,直到job这个channel被close函数关闭 if !ok { println(name, "done") return } f(j) }(i) } go func() { <-quit close(job) wg.Wait() quit <- struct{}{} }() return quit } func main() { quit := spawnGroup(5, worker) println("spawn a group of workers") time.Sleep(time.Second * 5) println("notify the group worker to exit...") quit <- struct{}{} time.Sleep(time.Second * 5) timer := time.NewTimer(time.Second * 10) defer timer.Stop() select { case <-quit: println("group workers done") case <-timer.C: println("wait group workers exit timeout") } } $ go run notify-and-wait-group-goroutines-exit-demo.go spawn a group of workers notify the group worker to exit... worker-4 done worker-0 done worker-2 done worker-3 done worker-1 done group workers done
上例利用了worker goroutine接收任务的channel来广播退出通知,而实现这一广播就是close(job)。此时各个worker goroutine监听job channel,当创建者关闭job channel时,通过comma ok惯用法获取ok值为fasle,worker goroutine执行退出模式。
Go语言学习笔记、语法知识、技术要点和个人理解及实战