万字长文带你吃透golang channel
Go语言是一门支持并发编程的语言,它提供了一种特殊的数据类型:channel,用于在不同的goroutine之间传递数据和同步。channel可以让我们编写出简洁、高效、安全的并发程序。本文将介绍channel的基本概念和语法、特性和原理、最佳实践和高级用法,希望能够帮助你更好地理解和使用channel。
一、channel的基本概念和语法
channel是一种引用类型,它可以用来在两个或多个goroutine之间传递数据。
1. 声明一个channel
要使用channel,首先需要声明,channel的声明方式如下:
var ch chan int // 声明一个int类型的channel,零值为nil
ch = make(chan int) // 创建一个int类型的channel,返回一个channel值
ch := make(chan int) // 声明并创建一个int类型的channel
channel的创建方式是使用内置函数make
,它接受一个channel类型作为参数,并返回一个对应类型的channel值。我们可以指定一个可选的容量参数,表示channel能够缓存的数据个数。如果不指定容量参数,或者指定为0,那么就是创建一个无缓冲的channel。
2. 缓冲
上面说到声明channel的时候,make的第二个参数如果是正整数,则表明是一个有缓冲的channel,我们再来创建一个无缓冲和有缓冲的channel来直接对比一下声明上有什么差异:
ch1 := make(chan int) // 创建一个无缓冲的int类型的channel
ch2 := make(chan int, 10) // 创建一个有缓冲的int类型的channel,容量为10
如何理解缓冲
两个字呢,其实就是发送数据的时候,无论是否有接收方,都可以成功发送,但channel中最多缓存容量
个元素,如果容量达到最大值,发送将会阻塞(暂且认为是阻塞,不同的场景会导致出现的结果不同)
ch1 := make(chan int) // 创建一个无缓冲的int类型的channel
ch2 := make(chan int, 10) // 创建一个有缓冲的int类型的channel,容量为10
ch2 <- 1 // 正常运行,
ch1 <- 1 // panic,fatal error: all goroutines are asleep - deadlock!
3.发送和接收操作
channel的操作主要有两种:发送和接收。发送操作是使用<-
运算符将一个值发送到channel中,接收操作是使用<-
运算符从channel中接收一个值。发送和接收操作都是阻塞的,也就是说,如果没有对应的接收方或发送方,那么操作就会等待,直到有匹配的操作出现。
ch := make(chan int) // 创建一个无缓冲的int类型的channel
go func() {
n := <-ch // 从ch中接收一个值,赋给n
fmt.Println("Received:", n)
}()
ch <- 42 // 向ch中发送一个值42
fmt.Println("Sent:", 42)
上面的代码中,我们创建了一个无缓冲的int类型的channel,并启动了一个匿名goroutine来从channel中接收数据。然后我们向channel中发送了一个值42,并打印出"Sent: 42"。注意,如果没有启动匿名goroutine来接收数据,那么主goroutine在发送数据时就会阻塞,导致死锁。
4.关闭channel
我们可以使用close
函数来关闭一个channel。关闭一个channel表示不再向它发送任何数据。关闭一个已经关闭的channel或者从一个已经关闭的channel中发送数据都会导致panic。我们可以使用ok-idiom
来判断一个channel是否已经关闭。
ch := make(chan int) // 创建一个无缓冲的int类型的channel
go func() {
for {
n, ok := <-ch // 从ch中接收一个值,并判断是否成功
if !ok {
fmt.Println("Channel closed")
break
}
fmt.Println("Received:", n)
}
}()
ch <- 1 // 向ch中发送一个值1
ch <- 2 // 向ch中发送一个值2
close(ch) // 关闭ch
fmt.Println("Channel closed")
上面的代码中,我们创建了一个无缓冲的int类型的channel,并启动了一个匿名goroutine来循环从channel中接收数据。我们使用n, ok := <-ch
的语法来接收数据,并判断ok
是否为true
。如果为true
,表示接收成功;如果为false
,表示channel已经关闭,那么就跳出循环。然后我们向channel中发送了两个值,并关闭了channel。注意,关闭channel并不会影响从channel中接收已经发送的数据,只是不能再发送新的数据。
5.channel的方向
最后,我们还可以指定channel的类型和方向。类型表示channel能够传递的数据的类型,方向表示channel能够进行的操作。默认情况下,channel是双向的,既能发送也能接收。但是我们可以将一个双向的channel转换为一个单向的channel,只能发送或只能接收。
var ch1 chan int // 声明一个双向的int类型的channel
var ch2 chan<- int // 声明一个只能发送的int类型的channel
var ch3 <-chan int // 声明一个只能接收的int类型的channel
ch1 = ch2 // 错误,不能将一个只能发送的channel赋值给一个双向的channel
ch1 = ch3 // 错误,不能将一个只能接收的channel赋值给一个双向的channel
ch2 = ch1 // 正确,可以将一个双向的channel赋值给一个只能发送的channel
ch3 = ch1 // 正确,可以将一个双向的channel赋值给一个只能接收的channel
上面的代码中,我们声明了三个不同方向的int类型的channel,并尝试进行赋值操作。注意,只能将一个双向的channel转换为一个单向的channel,反过来是不行的。单向的channel主要用于函数参数或返回值,用来限制函数对channel的操作。
func send(ch chan<- int, n int) {
ch <- n // 可以发送数据
// n = <-ch // 错误,不能接收数据
}
func receive(ch <-chan int) int {
n := <-ch // 可以接收数据
// ch <- n // 错误,不能发送数据
return n
}
func main() {
ch := make(chan int) // 创建一个双向的int类型的channel
go send(ch, 42) // 调用send函数,传入一个只能发送的channel
n := receive(ch) // 调用receive函数,传入一个只能接收的channel
fmt.Println(n) // 打印42
}
上面的代码中,我们定义了两个函数:send
和receive
,它们分别接受一个只能发送和一个只能接收的int类型的channel作为参数,并进行相应的操作。然后我们在主函数中创建了一个双向的int类型的channel,并分别传入这两个函数中。这段代码完成了最基本的发送者和接收者的完整示例。
二、channel的特性和原理
1. 无缓冲和有缓冲channel
前面我们简单介绍了缓冲的含义和基本用法,无缓冲和有缓冲channel是指创建时是否指定了容量参数。无缓冲的channel没有容量参数,或者容量参数为0;有缓冲的channel有一个正整数的容量参数。无缓冲和有缓冲channel在行为上有一些区别,主要体现在以下几个方面:
- 无缓冲的channel是同步的,也就是说,发送和接收操作必须成对出现,否则会阻塞。无缓冲的channel可以用来实现goroutine之间的同步和通信,例如等待一个goroutine完成某个任务,或者传递一个信号或一个值。
- 有缓冲的channel是异步的,也就是说,发送操作只有在channel已满时才会阻塞,接收操作只有在channel为空时才会阻塞。有缓冲的channel可以用来实现goroutine之间的解耦和流量控制,例如实现一个worker pool或者一个消息队列。
- 无缓冲的channel保证了发送和接收操作的顺序一致,也就是说,发送方发送的第一个值一定会被接收方第一个接收到。有缓冲的channel则不能保证这一点,因为发送方和接收方可能并发地进行操作,导致数据在channel中乱序。
- 无缓冲的channel通常用于传递少量的数据,因为它们不能缓存数据。有缓冲的channel通常用于传递大量的数据,因为它们可以缓存数据。但是要注意,channel不是用来存储数据的,而是用来传递数据的。如果我们不及时地从channel中接收数据,那么就会导致发送方阻塞,影响程序的性能。
下面我们举一些例子来说明无缓冲和有缓冲channel的使用场景。
1.)无缓冲channel的使用场景
- 实现goroutine之间的同步
func main() {
done := make(chan struct{}) // 创建一个无缓冲的空结构体类型的channel
go func() {
fmt.Println("Hello, world!") // 打印一句话
done <- struct{}{} // 向done中发送一个空结构体值
}()
<-done // 从done中接收一个值,表示goroutine已经完成
}
上面的代码中,我们创建了一个无缓冲的空结构体类型的channel,并启动了一个匿名goroutine来打印一句话。然后我们向done中发送了一个空结构体值,并在主函数中从done中接收了一个值。这样就实现了主函数等待匿名goroutine完成的同步效果。注意,这里我们使用空结构体类型是因为它不占用任何内存空间,只是用来传递信号。
- 实现goroutine之间的通信
func main() {
ch := make(chan int) // 创建一个无缓冲的int类型的channel
go func() {
n := <-ch // 从ch中接收一个值
fmt.Println("Received:", n)
}()
ch <- 42 // 向ch中发送一个值
fmt.Println("Sent:", 42)
}
上面的代码中,我们创建了一个无缓冲的int类型的channel,并启动了一个匿名goroutine来从channel中接收数据。然后我们向channel中发送了一个值,并打印出"Sent: 42"。这样就实现了主函数向匿名goroutine传递数据的通信效果。
2.)有缓冲channel的使用场景
- 实现goroutine之间的解耦
func main() {
ch := make(chan int, 10) // 创建一个有缓冲的int类型的channel,容量为10
go func() {
for i := 0; i < 10; i++ {
ch <- i // 向ch中发送10个值
}
close(ch) // 关闭ch
}()
for n := range ch { // 使用for range语法从ch中接收数据,直到ch关闭
fmt.Println("Received:", n)
}
}
上面的代码中,我们创建了一个有缓冲的int类型的channel,并启动了一个匿名goroutine来向channel中发送数据。然后我们使用for range语法从channel中接收数据,直到channel关闭。这样就实现了发送方和接收方的解耦效果,因为发送方不需要等待接收方,只要channel有空位就可以发送数据,而接收方也不需要等待发送方,只要channel有数据就可以接收数据。
2. channel的底层实现和数据结构
在上一节中,我们介绍了无缓冲和有缓冲channel的区别和应用场景。在这一节中,我们将介绍channel的底层实现和数据结构,以便更深入地理解channel的工作原理和性能特点。
1.)channel的底层实现
channel的底层实现是使用Go语言编写的,它主要涉及到以下几个文件:
- runtime/chan.go:定义了channel的数据结构和相关的操作函数。
- runtime/runtime2.go:定义了一些与channel相关的常量和类型。
- runtime/proc.go:定义了调度器的逻辑和与goroutine相关的操作函数。
- runtime/select.go:定义了select语句的逻辑和相关的操作函数。
我们先来看一下runtime/chan.go文件中定义的channel的数据结构:
// A hchan is a Go channel.
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16 // size of each element
closed uint32 // marks the channel as closed
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
上面的代码中,我们可以看到hchan结构体包含了以下几个字段:
- qcount:表示当前在队列中的数据个数。
- dataqsiz:表示队列的大小,也就是容量参数。
- buf:表示指向一个数组的指针,用来存储队列中的数据。
- elemsize:表示每个元素的大小。
- closed:表示channel是否已经关闭。
- elemtype:表示元素的类型。
- sendx:表示发送数据时的索引位置。
- recvx:表示接收数据时的索引位置。
- recvq:表示等待接收数据的goroutine队列。
- sendq:表示等待发送数据的goroutine队列。
- lock:表示用来保护hchan结构体的互斥锁。
从上面的字段可以看出,channel的底层实现是使用一个环形队列来存储数据,以及两个链表来存储等待的goroutine。当我们向channel中发送或接收数据时,就会调用runtime/chan.go文件中定义的相关函数,例如:
// chansend1(hchan *chan any, elem any) (pres bool);
// chansend(hchan *chan any, ep unsafe.Pointer, block bool, callerpc uintptr) (selected bool, received bool);
// chanrecv1(hchan *chan any, elem any) (received bool);
// chanrecv(hchan *chan any, ep unsafe.Pointer, block bool) (selected bool, received bool, closed bool);
这些函数的逻辑大致如下:
- 如果channel为nil,那么就会阻塞当前goroutine,直到被调度器唤醒或者超时(如果使用了select语句)。
- 如果channel已经关闭,那么就会根据操作类型进行不同的处理。如果是发送操作,那么就会panic;如果是接收操作,那么就会返回零值和false(表示没有接收到数据)。
- 如果channel不为空,那么就会尝试从队列中发送或接收数据。如果成功,那么就会返回相应的结果;如果失败,那么就会继续进行下一步。
- 如果channel不为空,那么就会尝试从等待队列中找到匹配的goroutine。如果成功,那么就会直接与之交换数据,并唤醒该goroutine;如果失败,那么就会继续进行下一步。
- 如果channel不为空,那么就会根据操作类型进行不同的处理。如果是发送操作,并且channel有空位,那么就会将数据放入队列中,并返回相应的结果;如果是接收操作,并且channel有数据,那么就会从队列中取出数据,并返回相应的结果;如果都不满足,那么就会继续进行下一步。
- 如果channel不为空,那么就会根据block参数决定是否阻塞当前goroutine。如果block为true,那么就会将当前goroutine加入到等待队列中,并阻塞当前goroutine,直到被唤醒或者超时(如果使用了select语句);如果block为false,那么就会返回相应的结果。
2.)channel的数据结构
在上一节中,我们介绍了channel的底层实现和相关的操作函数。在这一节中,我们将介绍channel的数据结构和相关的类型和常量。
首先,我们来看一下runtime/runtime2.go文件中定义的与channel相关的常量和类型:
// Channel direction for select.
const (
chanrecv = iota
chansend
)
// A sudog holds goroutine state while blocked on a channel.
// The sudog is allocated on the goroutine's stack and includes a stack
// segment large enough to hold the channel's element type.
type sudog struct {
g *g
selectdone *uint32
next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack)
releasetime int64
nrelease int32
ticket uint32
parent *sudog // semaRoot binary tree
waitlink *sudog // g.waiting list or semaRoot
c *hchan // channel
}
// A waitq is a single-linked list of sudogs waiting on a channel.
type waitq struct {
first *sudog
last *sudog
}
上面的代码中,我们可以看到以下几个常量和类型:
chanrecv
和chansend
:表示channel的操作方向,用于select语句。sudog
:表示在channel上阻塞的goroutine的状态,它包含了一个指向goroutine的指针、一个指向select语句的指针、一个指向数据元素的指针、一个指向channel的指针,以及一些其他的字段。sudog是分配在goroutine的栈上的,它包含了一个足够大的栈段来存储channel的元素类型。waitq
:表示在channel上等待的sudog的单链表,它包含了一个指向第一个和最后一个sudog的指针。
其次,我们来看一下runtime/chan.go文件中定义的与channel相关的类型和函数:
// A scase represents a channel operation in a select statement.
type scase struct {
c *hchan // chan
pc uintptr // return pc (for race detector / msan)
kind uint16 // case kind
elem unsafe.Pointer // pointer to element (send)
receivedp *bool // pointer to received bool (recv2)
}
// Select statement header.
// This is followed in memory by nelem scase structs and then nelem elem types.
type selectdata struct {
tcase uint16 // total count of scase[]
ncase uint16 // currently filled scase[]
pollorder *uint16 // case poll order
lockorder *uint8 // channel lock order
scase [1]scase // one or more cases
}
// selectgo implements the select statement.
//
// cas0 points to an array of type [ncases]scase, and order0 points to
// an array of type [2*ncases]uint16. Both reside on the goroutine's stack.
// The first half of order0 contains the subscripts of cas0 in a random order,
// and the second half is scratch space. The number of cases is given by ncases,
// which must be >= 2. Selected points to a uint16, which will be set to the
// index of the chosen scase. Receivedp points to a bool, which will be set to
// whether the operation on the chosen channel was a receive that received a value.
func selectgo(cas0 *scase, order0 *uint16, ncases int, selected *uint16, receivedp *bool) {
...
}
上面的代码中,我们可以看到以下几个类型和函数:
scase
:表示select语句中的一个channel操作,它包含了一个指向channel的指针、一个返回地址(用于检测竞态条件或内存访问错误)、一个操作类型、一个指向元素的指针、一个指向接收标志的指针。selectdata
:表示select语句的头部,它包含了总共和当前填充的scase个数、随机排列和锁定顺序的数组、以及至少一个scase结构体。selectdata是分配在goroutine的栈上的,它后面跟着nelem个scase结构体和nelem个元素类型。selectgo
:表示select语句的实现函数,它接受一个指向scase数组的指针、一个指向随机排列数组的指针、一个表示scase个数的整数、一个指向选择结果的指针、一个指向接收标志的指针。它会根据一定的逻辑来选择一个可执行的channel操作,并返回相应的结果。
三、channel的最佳实践
channel的基础用法已经知道了,但是实际编码中我们如何更好的使用channel呢,下面一些最佳实践,在我们常规的工作中已经足够使用了。
1. 使用for range语法来接收数据
当我们需要从一个channel中接收数据时,我们可以使用for range语法来循环接收数据,直到channel关闭。这样可以避免使用无限循环或者手动判断channel是否关闭的麻烦。例如:
ch := make(chan int) // 创建一个无缓冲的int类型的channel
go func() {
for i := 0; i < 10; i++ {
ch <- i // 向ch中发送10个值
}
close(ch) // 关闭ch
}()
for n := range ch { // 使用for range语法从ch中接收数据,直到ch关闭
fmt.Println("Received:", n)
}
上面的代码中,我们创建了一个无缓冲的int类型的channel,并启动了一个匿名goroutine来向channel中发送数据并关闭channel。然后我们使用for range语法从channel中接收数据,直到channel关闭。
2. 使用select语法来处理多个channel
当我们需要同时处理多个channel时,我们可以使用select语句来监听多个case,并执行其中一个可运行的case。如果没有任何case可运行,那么就会执行default分支,如果没有default分支,那么就会阻塞当前goroutine,直到有一个case可运行。select语句可以让我们编写出更简洁和高效的并发程序。例如:
ch1 := make(chan int) // 创建一个无缓冲的int类型的channel
ch2 := make(chan int) // 创建一个无缓冲的int类型的channel
go func() {
ch1 <- 1 // 向ch1中发送一个值
time.Sleep(time.Second) // 睡眠一秒钟
ch2 <- 2 // 向ch2中发送一个值
}()
select {
case n := <-ch1: // 如果ch1中有数据,就接收并赋给n
fmt.Println("Received from ch1:", n)
case n := <-ch2: // 如果ch2中有数据,就接收并赋给n
fmt.Println("Received from ch2:", n)
default: // 如果都没有数据,就执行default分支
fmt.Println("No data")
}
上面的代码中,我们创建了两个无缓冲的int类型的channel,并启动了一个匿名goroutine来向它们中发送数据。然后我们使用select语句来监听两个case,并执行其中一个可运行的case。如果都没有数据,就执行default分支。
3. 使用close函数来关闭channel
当我们不再向一个channel发送任何数据时,我们可以使用close函数来关闭该channel。关闭一个channel表示不再向它发送任何数据。关闭一个已经关闭的channel或者从一个已经关闭的channel中发送数据都会导致panic。关闭一个channel可以让从该channel中接收数据的goroutine知道没有更多的数据了,并返回零值和false(表示没有接收到数据)。例如:
ch := make(chan int) // 创建一个无缓冲的int类型的channel
go func() {
for i := 0; i < 10; i++ {
ch <- i // 向ch中发送10个值
}
close(ch) // 关闭ch
}()
for {
n, ok := <-ch // 从ch中接收一个值,并判断是否成功
if !ok {
fmt.Println("Channel closed")
break
}
fmt.Println("Received:", n)
}
上面的代码中,我们创建了一个无缓冲的int类型的channel,并启动了一个匿名goroutine来向channel中发送数据并关闭channel。然后我们使用n, ok := <-ch的语法来接收数据,并判断ok是否为true。如果为true,表示接收成功;如果为false,表示channel已经关闭,那么就跳出循环。
4. 使用ok-idiom来判断channel是否关闭
当我们从一个channel中接收数据时,我们可以使用ok-idiom来判断该channel是否已经关闭。ok-idiom是指使用n, ok := <-ch的语法来接收数据,并判断ok是否为true。如果为true,表示接收成功;如果为false,表示channel已经关闭。ok-idiom可以让我们在接收数据的同时,检查channel的状态,以便及时处理异常情况。例如:
ch := make(chan int) // 创建一个无缓冲的int类型的channel
go func() {
ch <- 1 // 向ch中发送一个值
close(ch) // 关闭ch
}()
n, ok := <-ch // 从ch中接收一个值,并判断是否成功
if ok {
fmt.Println("Received:", n)
} else {
fmt.Println("Channel closed")
}
上面的代码中,我们创建了一个无缓冲的int类型的channel,并启动了一个匿名goroutine来向channel中发送数据并关闭channel。然后我们使用n, ok := <-ch的语法来接收数据,并判断ok是否为true。如果为true,表示接收成功;如果为false,表示channel已经关闭。
5. 避免常见的channel错误和死锁
当我们使用channel时,我们需要注意一些常见的错误和死锁,以免影响程序的正确性和性能。以下是一些常见的错误和死锁的例子和解决方法:
1).向一个nil channel发送或接收数据
var ch chan int // 声明一个int类型的channel,零值为nil
ch <- 1 // 向ch中发送一个值,会导致阻塞
n := <-ch // 从ch中接收一个值,会导致阻塞
解决方法:使用make函数创建一个非nil的channel。
2). 向一个已经关闭的channel发送数据
ch := make(chan int) // 创建一个无缓冲的int类型的channel
close(ch) // 关闭ch
ch <- 1 // 向ch中发送一个值,会导致panic
解决方法:在发送数据之前,检查channel是否已经关闭。
3). 从一个已经关闭的channel接收数据
ch := make(chan int) // 创建一个无缓冲的int类型的channel
close(ch) // 关闭ch
n := <-ch // 从ch中接收一个值,会返回零值和false
解决方法:使用ok-idiom来判断channel是否已经关闭。
4). 忘记关闭不再使用的channel
func producer(ch chan int) {
for i := 0; i < 10; i++ {
ch <- i // 向ch中发送10个值
}
// 忘记关闭ch
}
func consumer(ch chan int) {
for n := range ch { // 使用for range语法从ch中接收数据,直到ch关闭
fmt.Println("Received:", n)
}
}
func main() {
ch := make(chan int) // 创建一个无缓冲的int类型的channel
go producer(ch) // 启动生产者goroutine
go consumer(ch) // 启动消费者goroutine
}
解决方法:在不再使用channel时,及时关闭它。
5). 缺少发送方或者接收方
func main() {
ch := make(chan int) // 创建一个无缓冲的int类型的channel
ch <- 1 // 向ch中发送一个值,没有对应的接收方,会导致阻塞和死锁
}
解决方法:确保每个发送操作都有对应的接收操作。
四、channel的高级用法
现在我们已经知道了如何使用for range、select、close和ok-idiom等语法来操作channel,以及如何避免常见的channel错误和死锁。但是channel的使用场景仅此吗?不是的,我们还可以使用channel实现一些并发模式,如worker pool、fan-in/fan-out、pipeline等
worker pool
:使用一个有缓冲的channel作为任务队列,使用多个goroutine作为worker从队列中获取任务并执行,使用一个无缓冲的channel作为信号通知worker退出。fan-in
:使用多个goroutine从不同的数据源接收数据,并将数据发送到一个共享的channel中(fan-in),fan-out
:从一个共享的channel中接收数据,并将数据发送到不同的数据目的地(fan-out)。stages/pipeline
:使用多个goroutine和通道构成一个数据处理流水线,每个goroutine负责一个处理阶段,从上游的通道中接收数据,并将处理后的数据发送到下游的通道中。这样可以将一个复杂的任务分解成多个简单的任务,并利用多核CPU的并行计算能力。graceful exit
:使用一个专门的通道来传递退出信号,让所有的goroutine在完成当前的工作后,及时关闭通道并退出。这样可以避免资源泄露或者异常终止。
下面我们举一些例子来说明如何使用channel实现这些并发模式。
1. worker pool
func worker(id int, jobs <-chan int, results chan<- int) {
for j := range jobs { // 使用for range语法从jobs中接收数据,直到jobs关闭
fmt.Println("worker", id, "processing job", j)
time.Sleep(time.Second) // 模拟耗时操作
results <- j * 2 // 将结果发送到results中
}
}
func main() {
jobs := make(chan int, 100) // 创建一个有缓冲的int类型的channel,作为任务队列
results := make(chan int, 100) // 创建一个有缓冲的int类型的channel,作为结果队列
for w := 1; w <= 3; w++ { // 启动3个worker goroutine
go worker(w, jobs, results)
}
for j := 1; j <= 9; j++ { // 发送9个任务到jobs中
jobs <- j
}
close(jobs) // 关闭jobs
for a := 1; a <= 9; a++ { // 从results中接收9个结果
<-results
}
}
上面的代码中,我们创建了两个有缓冲的int类型的channel,分别作为任务队列和结果队列。然后我们启动了3个worker goroutine,它们从任务队列中接收数据,并将处理后的结果发送到结果队列中。然后我们向任务队列中发送了9个任务,并关闭了任务队列。最后我们从结果队列中接收了9个结果。
2. fan-in/fan-out
func producer(ch chan<- int, d time.Duration) {
var i int
for {
ch <- i // 向ch中发送一个值
i++
time.Sleep(d) // 睡眠一段时间
}
}
func consumer(ch <-chan int) {
for v := range ch { // 使用for range语法从ch中接收数据,直到ch关闭
fmt.Println(v)
}
}
func main() {
ch := make(chan int) // 创建一个无缓冲的int类型的channel
go producer(ch, time.Millisecond*500) // 启动一个生产者goroutine,每隔500毫秒向ch中发送一个值
go producer(ch, time.Millisecond*2000) // 启动另一个生产者goroutine,每隔2000毫秒向ch中发送一个值
consumer(ch) // 调用消费者函数,从ch中接收数据
}
上面的代码中,我们创建了一个无缓冲的int类型的channel。然后我们启动了两个生产者goroutine,它们以不同的频率向channel中发送数据。这就是fan-in模式,即多个生产者向一个消费者发送数据。然后我们调用了消费者函数,它从channel中接收数据。这就是fan-out模式,即一个生产者向多个消费者发送数据。
3. stages/pipeline
// generator函数返回一个只能发送的string类型的通道,用来生成一些字符串
func generator(done <-chan struct{}) <-chan string {
ch := make(chan string) // 创建一个无缓冲的string类型的通道
go func() {
defer close(ch) // 关闭ch
for _, s := range []string{"hello", "world", "go", "language"} {
select {
case ch <- s: // 向ch中发送s
case <-done: // 如果接收到退出信号,就返回
return
}
}
}()
return ch // 返回ch
}
// filter函数返回一个只能发送的string类型的通道,用来过滤从in中接收到的字符串,只保留长度大于等于min的字符串
func filter(done <-chan struct{}, in <-chan string, min int) <-chan string {
ch := make(chan string) // 创建一个无缓冲的string类型的通道
go func() {
defer close(ch) // 关闭ch
for s := range in { // 使用for range语法从in中接收数据,直到in关闭
if len(s) >= min { // 如果s的长度大于等于min
select {
case ch <- s: // 向ch中发送s
case <-done: // 如果接收到退出信号,就返回
return
}
}
}
}()
return ch // 返回ch
}
// transform函数返回一个只能发送的string类型的通道,用来转换从in中接收到的字符串,将其转换成大写形式
func transform(done <-chan struct{}, in <-chan string) <-chan string {
ch := make(chan string) // 创建一个无缓冲的string类型的通道
go func() {
defer close(ch) // 关闭ch
for s := range in { // 使用for range语法从in中接收数据,直到in关闭
select {
case ch <- strings.ToUpper(s): // 向ch中发送s的大写形式
case <-done: // 如果接收到退出信号,就返回
return
}
}
}()
return ch // 返回ch
}
// consumer函数从in中接收数据,并打印出来
func consumer(in <-chan string) {
for s := range in { // 使用for range语法从in中接收数据,直到in关闭
fmt.Println(s) // 打印s
}
}
func main() {
done := make(chan struct{}) // 创建一个无缓冲的空结构体类型的通道,作为退出信号
in := generator(done) // 调用generator函数,生成一些字符串
ch1 := filter(done, in, 3) // 调用filter函数,过滤长度小于3的字符串
ch2 := transform(done, ch1) // 调用transform函数,转换成大写形式
consumer(ch2) // 调用consumer函数,打印结果
close(done) // 关闭done,向所有goroutine发送退出信号
}
上面的代码中,我们定义了四个函数:generator、filter、transform和consumer,它们分别负责生成字符串、过滤字符串、转换字符串和打印结果。每个函数都接受一个只能接收的空结构体类型的通道作为退出信号,并返回一个只能发送或只能接收的string类型的通道作为数据传递。然后我们在主函数中调用这四个函数,并将它们连接起来,形成一个数据处理流水线。最后我们关闭退出信号通道,让所有goroutine在完成当前工作后退出。
4. graceful exit
package main
import (
"fmt"
)
// worker函数从jobs中接收数据,并将结果发送到results中。如果接收到quit信号,就退出。
func worker(id int, jobs <-chan int, results chan<- int, quit <-chan bool) {
for {
select {
case j := <-jobs: // 如果jobs中有数据,就接收并赋给j
fmt.Println("worker", id, "processing job", j)
results <- j * 2 // 将结果发送到results中
case <-quit: // 如果接收到quit信号,就退出
fmt.Println("worker", id, "quitting")
return
}
}
}
func main() {
jobs := make(chan int, 100) // 创建一个有缓冲的int类型的通道,作为任务队列
results := make(chan int, 100) // 创建一个有缓冲的int类型的通道,作为结果队列
quit := make(chan bool) // 创建一个无缓冲的bool类型的通道,作为退出信号
for w := 1; w <= 3; w++ { // 启动3个worker goroutine
go worker(w, jobs, results, quit)
}
for j := 1; j <= 9; j++ { // 发送9个任务到jobs中
jobs <- j
}
close(jobs) // 关闭jobs
for a := 1; a <= 9; a++ { // 从results中接收9个结果
<-results
}
for w := 1; w <= 3; w++ { // 向quit中发送3个信号,让所有worker退出
quit <- true
}
}
上面的代码中,我们定义了一个worker函数,它从jobs中接收数据,并将结果发送到results中。如果接收到quit信号,就退出。然后我们在主函数中创建了三个通道:jobs、results和quit,并启动了三个worker goroutine。然后我们向jobs中发送了九个任务,并关闭了jobs。然后我们从results中接收了九个结果。最后我们向quit中发送了三个信号,让所有worker退出。没错,如果仔细看了stages的实现就会想到,其实我们并不需要想quit中发送3个信息,直接调用close(quit)
关闭channel也能得到一样的效果。
5. 广播
广播是指将一个消息或信号发送给多个接收者的行为。在Go语言中,我们可以使用一个无缓冲的channel来实现广播的功能,其实上面的优雅推出就是广播的一个例子,简化一下代码可以更清晰了解运行机制:
func main() {
ch := make(chan struct{}) // 创建一个无缓冲的空结构体类型的channel
for i := 0; i < 10; i++ { // 启动10个goroutine
go func(i int) {
fmt.Println("goroutine", i, "waiting")
<-ch // 从ch中接收一个值
fmt.Println("goroutine", i, "done")
}(i)
}
fmt.Println("Ready...")
time.Sleep(time.Second) // 睡眠一秒钟
fmt.Println("Go!")
close(ch) // 关闭ch,向所有goroutine发送信号
time.Sleep(time.Second) // 睡眠一秒钟
}
上面的代码中,我们创建了一个无缓冲的空结构体类型的channel,并启动了10个goroutine。每个goroutine都会从channel中接收一个值,并打印出自己的状态。然后我们在主函数中打印出Ready...
,并睡眠一秒钟。然后我们打印出Go!
,并关闭channel。这样就向所有goroutine发送了一个信号,让它们都继续执行,并打印出done
。
6. 超时取消
超时是指在一定时间内没有完成某个操作或任务的情况。在Go语言中,我们可以使用time.After函数来创建一个定时器,并将其返回值(一个只能接收的time.Time类型的channel)作为select语句的一个case来实现超时的功能,例如:
func main() {
ch := make(chan int) // 创建一个无缓冲的int类型的channel
go func() {
time.Sleep(3 * time.Second) // 睡眠3秒钟
ch <- 42 // 向ch中发送一个值
}()
select {
case n := <-ch: // 如果ch中有数据,就接收并赋给n
fmt.Println("Received:", n)
case <-time.After(2 * time.Second): // 如果2秒钟内没有数据,就执行此分支
fmt.Println("Timeout")
}
}
上面的代码中,我们创建了一个无缓冲的int类型的channel,并启动了一个匿名goroutine来向channel中发送数据。然后我们使用select语句来监听两个case:从channel中接收数据或者2秒钟后超时。如果2秒钟内没有数据,就会执行超时分支,并打印出"Timeout"。
7. 限速/限流
限速是指控制某个操作或任务的执行速率或频率的行为。在Go语言中,我们可以使用time.Ticker函数来创建一个定时器,并将其返回值(一个只能接收的time.Time类型的channel)作为select语句的一个case来实现限速的功能,例如:
func main() {
ch := make(chan int, 10) // 创建一个有缓冲的int类型的channel,容量为10
for i := 0; i < 10; i++ { // 向ch中发送10个值
ch <- i
}
close(ch) // 关闭ch
ticker := time.NewTicker(500 * time.Millisecond) // 创建一个定时器,每隔500毫秒触发一次
defer ticker.Stop() // 停止定时器
for n := range ch { // 使用for range语法从ch中接收数据,直到ch关闭
select {
case <-ticker.C: // 如果定时器触发,就执行此分支
fmt.Println("Received:", n)
}
}
}
上面的代码中,我们创建了一个有缓冲的int类型的channel,并向channel中发送了10个值。然后我们创建了一个定时器,每隔500毫秒触发一次。然后我们使用for range语法从channel中接收数据,直到channel关闭。每次接收数据时,我们都会使用select语句来监听定时器的触发,如果定时器触发,就会执行相应的分支,并打印出数据。这样就实现了每隔500毫秒接收一次数据的限速效果。
8. 优先级
优先级是指对某些操作或任务赋予不同的重要性或紧急性的行为。在Go语言中,我们可以使用多个channel来表示不同的优先级,并使用select语句来监听多个case,并执行其中一个可运行的case。如果有多个case可运行,那么就会随机选择一个执行。这样就可以实现一定程度的
优先级效果,例如:
func main() {
high := make(chan int) // 创建一个无缓冲的int类型的channel,表示高优先级
low := make(chan int) // 创建一个无缓冲的int类型的channel,表示低优先级
go func() {
for {
high <- 1 // 向high中发送一个值
time.Sleep(time.Second) // 睡眠一秒钟
}
}()
go func() {
for {
low <- 2 // 向low中发送一个值
time.Sleep(time.Second) // 睡眠一秒钟
}
}()
for {
select {
case n := <-high: // 如果high中有数据,就接收并赋给n
fmt.Println("High priority:", n)
case n := <-low: // 如果low中有数据,就接收并赋给n
fmt.Println("Low priority:", n)
}
}
}
上面的代码中,我们创建了两个无缓冲的int类型的channel,分别表示高优先级和低优先级。然后我们启动了两个匿名goroutine来向这两个channel中发送数据。然后我们使用select语句来监听两个case:从高优先级或低优先级的channel中接收数据。如果高优先级的channel有数据,那么就会执行相应的分支,并打印出High priority: 1
;如果低优先级的channel有数据,那么就会执行相应的分支,并打印出Low priority: 2
。如果两个channel都有数据,那么就会随机选择一个执行。
好了,相信现在你对channel已经了如指掌了,创作不易,给个赞支持一下!
#golang项目##go##golang工程师##高并发#