简析go语言并发模式

go语言实现了CSP并发模型:输入/输出为基本编程原语,数据处理逻辑(CSP模型中的P)仅需调用输入原语获取数据,顺序处理数据并将结果通过输出原语输出。Go引入了goroutine(P)之间的通信原语channel,goroutine可以从channel获取输入数据,再将处理后的数据结果通过channel输出。通过channel和goroutine组合连接在一起,使并发程序设计更为清晰,无须再为传统共享内存并发模型中的问题花费心智。引入select用于应对多路输入/输出,可以让goroutine同时协调处理多个channel操作。

Go语言也支持传统的基于共享内存的并发模型,并提供了基本的低级同步原语:主要是sync包中的互斥锁、条件变量、读写锁、原子操作等。但Go始终推荐以CSP模型风格构建并发程序,CSP模型风格go并发程序的并发模式大致分以下几种:

1.创建模式:使用go关键字+函数/方法创建一个goroutine,并返回一个channel类型变量的函数。这是Go中最常见的goroutine创建模式。

type T struct{}

func spawn(f func()) chan T {
	c := make(chan T)

	go func() {
		f()
		c <- T{}
	}()
	return c
}

func main() {
	c := spawn(func())
	<-c
}

spawn函数创建新的goroutine与调用spawn函数的goroutine之间通过一个channel建立联系,即两个goroutine通过channel进行通信。channel可以像变量一样被初始化、传递和赋值。

2.退出模式:goroutine的使用代价很低,官方推荐使用goroutine。在大多数情况下,我们无需考虑对goroutine的退出进行控制,goroutine的执行函数返回,就意味着goroutine的退出。但一些常驻后台服务程序可能会对goroutine有着优雅退出的要求。goroutine的退出模式大致分以下几种:

1)分离模式:分离模式是使用最广泛的goroutine退出模式。对于分离的goroutine,创建它的goroutine不需要关心它的退出,这类goroutine在创建后即与其创建者彻底分离,其生命周期与其执行的主函数相关,函数返回即goroutine退出。这类goroutine有如下两个常见用途。

a.一次性任务:新创建的goroutine用来执行一个简单任务,执行后退出。

import (
	"fmt"
	"time"
)

func worker(args ...interface{}) {
	if len(args) == 0 {
		return
	}

	interval, ok := args[0].(int)
	if !ok {
		return
	}

	time.Sleep(time.Second * time.Duration(interval))
}

func spawn(f func(args ...interface{}), args ...interface{}) {
	go func() {
		f(args...)
	}()
}

func main() {
	spawn(worker, 5)
}

b.常驻任务:常驻后台执行一些特定任务,如监视(monitor)、观察(watch)等。其实现通常采用for {...}或者for {select{...}}代码段形式,并多以定时器(timer)或事件(event)驱动执行。

import (
	"fmt"
	"time"
)

func worker(args ...interface{}) {
	if len(args) == 0 {
		fmt.Println("invalid args")
		return
	}

	interval, ok := args[0].(int)
	if !ok {
		println("invalid interval args")
		return
	}

	time.Sleep(time.Second * time.Duration(interval))
}

func spawn(f func(args ...interface{}), args ...interface{}) {
	for {
		go func() {
			f(args...)
		}()
	}
}

func main() {
	spawn(worker, 5)
}

2)join模式:goroutine的创建者等待新创建的goroutine结束。

a.等待一个goroutine退出

func worker(args ...interface{}) {
	if len(args) == 0 {
		return
	}
	interval, _ := args[0].(int)
	time.Sleep(time.Second * time.Duration(interval))
}

func spawn(f func(args ...interface{}), args ...interface{}) chan struct{} {
	c := make(chan struct{})

	go func() {
		f(args...)
		c <- struct{}{}
		time.Sleep(time.Second * 3)  // 此处是为了说明向channel发送数据后,main goroutine是立刻感知还是等spawn函数返回后才感知
		fmt.Println("worker go routine end")
	}()
	return c
}

func main() {  // main goroutine
	c := spawn(worker, 5)  // 调用spawn,spawn函数创建了一个新的goroutine

	fmt.Println("main goroutine start")
	<-c
	fmt.Println("main goroutine end")
	time.Sleep(time.Second * 5)  // 等待新的goroutine执行完成
}

$ go run wait-one-goroutine-demo.go
main goroutine start
main goroutine end
worker go routine end  // 从结果可以看出,向channel发送信号是在新的goroutine退出前,新的goroutine发送信号后main goroutine立刻感知到接触阻塞执行了main函数体剩余的代码,而不是等待新的goroutine执行完毕main goroutine才感知到。

spawn函数使用典型的goroutine创建模式创建了一个goroutine,main goroutine作为创建者通过spawn函数返回的channel与新goroutine建立联系,这个channel的用途就是在两个goroutine之间建立退出事件的”信号“通信机制。main goroutine在创建完新的goroutine后便在该channel上阻塞等待,直到新的goroutine退出前向该channel发送一个信号。

b.获取goroutine的退出状态:新的goroutine创建者不仅要等待goroutine的退出,还要精准获取其结束状态。

import (
	"errors"
	"fmt"
	"time"
)

var OK = errors.New("ok")

func worker(args ...interface{}) error {
	if len(args) == 0 {
		return errors.New("invalid args")
	}

	interval, ok := args[0].(int)
	if !ok {
		return errors.New("invalid interval args")
	}
	time.Sleep(time.Second * time.Duration(interval))
	return OK
}

func spawn(f func(args ...interface{}) error, args ...interface{}) chan error {
	c := make(chan error)

	go func() {
		c <- f(args...)
	}()
	return c
}

func main() {
	done := spawn(worker, 5)
	fmt.Println("worker1 start")
	err := <-done
	fmt.Println("worker1 done:", err)

	done = spawn(worker, "a")
	fmt.Println("worker2 start")
	err = <-done
	fmt.Println("worker2 done:", err)
}

$ go run wait-and-get-exit-code-one-goroutine-demo.go
worker1 start
worker1 done: ok
worker2 start
worker2 done: invalid interval args

上例中将channel承载的类型由struct{}改为error,这样channel承载的信息不仅仅是一个信号,还携带了有价值的信息:新goroutine的结束状态。

c.等待多个goroutine退出:在某些场景中,goroutine创建者可能需要等待多个goroutine退出。可以通过Go语言提供的sync.WaitGroup实现等待多个goroutine退出

import (
	"fmt"
	"sync"
	"time"
)

func worker(args ...interface{}) {
	if len(args) == 0 {
		return
	}

	interval, ok := args[0].(int)
	if !ok {
		return
	}

	time.Sleep(time.Second * time.Duration(interval))
}

func spawnGroup(n int, f func(args ...interface{}), args ...interface{}) chan struct{} {
	c := make(chan struct{})
	var wg sync.WaitGroup

	for i := 0; i < n; i++ {
		wg.Add(1)
		go func(i int) {
			name := fmt.Sprintf("worker-%d", i)
			f(args...)
			println(name, "done")
			wg.Done()
		}(i)
	}

	go func() {
		wg.Wait()
		c <- struct{}{}
	}()
	return c
}

func main() {
	fmt.Println("spawn a group of workers")
	done := spawnGroup(5, worker, 3)
	<-done
	fmt.Println("group workers done")
}

$ go run wait-multiple-goroutine-exit-demo.go
spawn a group of workers
worker-1 done
worker-3 done
worker-4 done
worker-0 done
worker-2 done
group workers done

上例中通过sync.WaitGroup,spawnGorup每创建一个goroutine都会调用wg.Add(1),新创建的goroutine会在退出前调用wg.Done。在spwanGroup中还创建了一个用于监视的goroutine,该goroutine调用wg.Wait方法等待所有goroutine退出。在所有新创建的goroutine退出后,wg.Wait方法返回,并向c这个channel中写入一个信号,这是main goroutine会从阻塞在channel上的状态中恢复,继续往下执行。

d.超时等待机制:有时不需要无限等待所有新创建的goroutine退出,而是仅等待一段合理的时间。如果在这段时间内goroutine没有退出,则创建者会继续往下执行或者主动退出。

func main() {
	timer := time.NewTimer(time.Second * 5)
	done := spawnGroup(5, worker, 10)

	defer timer.Stop()
	fmt.Println("spawn a group of workers")
	select {
	case <-timer.C:
		println("wait group workers exit timeout")
	case <-done:
		println("group workers done")
	}
}

$ go run wait-multiple-goroutine-exit-with-timeout-demo.go
spawn a group of workers
wait group workers exit timeout

上例中通过创建一个定时器(time.Timer)设置超时等待时间,通过select监听timer.C和done两个channel,哪个先返回就执行哪个case分支。

3)notify-and-wait模式:前面的例子中都是goroutine创建者被动等待新goroutine退出,但是更多实现goroutine创建者需要主动通知新goroutine退出,尤其是当main作为goroutine的时候。main goroutine的退出意味着Go程序的终止,而粗暴地直接让main goroutine退出的方式可能导致业务数据损坏、不完整或丢失。我们可以通过notify-and-wait这一模式来满足这一要求。

a.通知并等待一个goroutine退出

import (
	"time"
)

func worker(m int) {
	time.Sleep(time.Second * time.Duration(m))
}

func spawn(f func(int)) chan string {
	quit := make(chan string)

	go func() {
		job := make(chan int)
		select {
		case n := <-job:
			f(n)
		case status := <-quit:
			quit <- status
		}
	}()

	return quit
}

func main() {
	quit := spawn(worker)

	println("work goroutine start")
	time.Sleep(time.Second * 5)

	println("notify the work to exit")
	quit <- "exit"

	timer := time.NewTimer(time.Second * 10)
	defer timer.Stop()
	select {
	case status := <-quit:
		println("worker done:", status)
	case <-timer.C:
		println("wait worker exit timeout")
	}
}

$ go run notify-and-wait-one-goroutine-exit-demo.go
work goroutine start
notify the work to exit
worker done: exit

上例中使用创建模式创建goroutine的spawn函数返回的channel成了一个双向数据通道:既承载创建者发送给新goroutine的退出信号,也承载新goroutine返回给创建者的退出状态。

b.通知并等待多个goroutine退出:当使用close函数关闭channel时,所有阻塞在该channel上的goroutine都会得到通知,这也是Go语言channel的一个特性。我们可以利用这一特性实现通知并等待多个goroutine退出。

import (
	"fmt"
	"sync"
	"time"
)

func worker(m int) {
	fmt.Printf("In worker-%d", m)
	time.Sleep(time.Second * (time.Duration(m)))
}

func spawnGroup(n int, f func(int)) chan struct{} {
	quit := make(chan struct{})
	job := make(chan int)
	var wg sync.WaitGroup

	for i := 0; i < n; i++ {
		wg.Add(1)
		go func(i int) {
			defer wg.Done()
			name := fmt.Sprintf("worker-%d", i)
			j, ok := <-job // j, ok会阻塞在这里,直到job这个channel被close函数关闭

			if !ok {
				println(name, "done")
				return
			}
			f(j)

		}(i)
	}

	go func() {
		<-quit
		close(job)
		wg.Wait()
		quit <- struct{}{}
	}()

	return quit
}

func main() {
	quit := spawnGroup(5, worker)
	println("spawn a group of workers")

	time.Sleep(time.Second * 5)

	println("notify the group worker to exit...")
	quit <- struct{}{}

	time.Sleep(time.Second * 5)

	timer := time.NewTimer(time.Second * 10)
	defer timer.Stop()
	select {
	case <-quit:
		println("group workers done")
	case <-timer.C:
		println("wait group workers exit timeout")
	}
}

$ go run notify-and-wait-group-goroutines-exit-demo.go
spawn a group of workers
notify the group worker to exit...
worker-4 done
worker-0 done
worker-2 done
worker-3 done
worker-1 done
group workers done

上例利用了worker goroutine接收任务的channel来广播退出通知,而实现这一广播就是close(job)。此时各个worker goroutine监听job channel,当创建者关闭job channel时,通过comma ok惯用法获取ok值为fasle,worker goroutine执行退出模式。

Go语言基础及实战 文章被收录于专栏

Go语言学习笔记、语法知识、技术要点和个人理解及实战

全部评论

相关推荐

想去夏威夷的小哥哥在度假:5和6才是重点
点赞 评论 收藏
分享
1 收藏 评论
分享
牛客网
牛客企业服务