《并发哲学:从编程入道到开悟升天》3.5 是时候了解并发范式
是时候了解并发范式
范式(paradigm)的概念和理论是美国著名科学哲学家托马斯·库恩(Thomas Kuhn) 提出并在《科学***的结构》(The Structure of Scientific Revolutions)(1962)中系统阐述的。很明显,这是一种哲学的概念。简而言之,范式就是在拥有基本生产与科学研究要素背景下,提炼出具有一定程度公认性、为更高层次生产和研究提供纲领性的指导意见与方法论。
通俗的说,范式就是指,建立在某个基本科学体系内,如果你要怎么样,最好应该怎么干的指导方针。就好比如果你想得到一杯开水,最好的方法是找一个有开水的地方接一杯。你想要获得好的睡眠,最好要创造出安静的环境和舒适的大床。当然,在严谨的科学体系内,相关的阐述会更加准确,而不是泛泛而谈。
回顾本章前几节介绍的内容,我们可以了解到在并发编程体系内,由Golang所引入的并发编程基本要素如下,你可以简单理解为构成最终带有并发特性代码的“原材料”,我们总结如下:
- 任务分配:利用go关键字
- 资源协调、点对点通信:利用channel
- 条件阻塞:select机制
- 基于简单记录的等待:waitgroup
- 事件传递与交会:cond
- 资源互斥标记:Mutex和RWMutex
- 抢占式任务标记:Once
- 资源池缓冲:Pool
在本小节,我们将进一步介绍与Golang相关的并发范式,作为进一步组织之前所学,形成的对于庞大任务分解过程中的经验解决方案。我们将会阐述以下范式:
- 不可变约定
- 协程闭包
- done通道
- for-select
- or-channel
- chRange
- tee-channel
- bridge-channel
- 规整化流水线
在介绍相关范式的过程中,我们将会对范式作用领域进行准确解构,请读者务必认真理解并细细体会,准备好了,就让我们开始吧。
不可变约定 实现直观并发安全控制
在任务执行过程中,往往会发生各类冲突,冲突来源于职责边界不清晰而导致的问题,本质属于资源冲突,对应的各类锁问题往往也源自于此。回顾之前所阐述的内容,解决该类问题,我们已经有了两个成熟的方案,可以总结如下:
- 通道机制:先后按序,没有东西过来,你别动
- 互斥锁机制:明确标记,有锁,你别动
而除去这两种方法,更加直观的方法是不可变约定,我们称之为:
- 不可变约定:不是你的,你别动
在日常生活中,这三种限制可以说是融合在方方面面。例如流水线上的工人,不同顺序工序下生产物品,上一流程未完,下一流程的工人就不会得到原材料,因而也就无法开展生产,这就像通道机制。又例如上厕所,厕所上锁,后来的人就无法得到坑位,这就好比互斥锁。而不可变约定就好比是一种道德制约,在批量干活之前,就把工作内容等提前明确划分好界限,完全凭借人的自觉进行操作。
这当然是安全的,因为已经充分给自己所负责范围内的资源赋予了权力,在自己权力内,怎么“折腾”都行。让我们来体会下面的案例:
printData := func(wg *sync.WaitGroup, data []byte) { defer wg.Done() var buff bytes.Buffer for _, b := range data { fmt.Fprintf(&buff, "%c", b) } fmt.Println(buff.String()) } var wg sync.WaitGroup wg.Add(2) data := []byte("golang") go printData(&wg, data[:3]) // 这个协程只能操作“golang”字符串的后半部分 go printData(&wg, data[3:]) // 这个协程只能操作“golang”字符串前半部分 wg.Wait()
在上面的例子中,我们把能够让每个协程处理的内容提前分配好,这样就确保这两个协程即便处理的是同一个字符串,但由于在处理之时,已经由代码编写者确保了处理的内容不会有交叉,因而只要协程内的操作一切按程序内合法逻辑(即不主动触发越界、溢出等不安全操作),这样的并发一定能做到并发/并行安全。
不过,可悲的是,世界以变化作为基本特征,这样的处理往往显得教条和呆板。即使在某种任务内涵及其准确的情况下,这种并发处理前准确安排任务的做法十分的高效并且易于理解,但是在实际的编程中,我们可能还是需要回到基于通道和基于共享内存的同步原语。
协程闭包 用于构建可控安全域
在不可变约定模式下,我们通过框定数据范围,来确保并发安全,这属于一种对资源的提前划分。然而,数据的多变性决定了这种方式可能在大部分情况下都具有局限性。不过,这种思想作用在代码作用域上,则被广泛使用,这就是协程闭包。简而言之,协程闭包通过指定编写范式,构建了预先划定的代码权限范围。来看下面的例子,这是一个大家都认为遵循了规则但却无意中触碰了教条的悲剧:
func Demo() { ch := make(chan int, 0) go dosomething(ch, 10) //把相同的活分配给A go dosomething(ch, 20) //把相同的活分配给B dosomething(ch, 30) } func dosomething(ch chan int, num int) { for i := 1; i < num; i++ { ch <- i } close(ch)//A\B都严格按照步骤行事,但是此处却因为代码缺陷A/B的对通道关闭操作会导致另外一方无法再操作! }
上面的代码,运行后显而易见的得到了死锁,A/B虽然干的活一样人不一样,但是触碰的信号量发生了排斥——通道在一个协程内发生了关闭,而这个通道又被另一个协程共享,实际上应该要避免这种共享因为二者工作中的信号量并不相关!
在这种情况下,我们有必要对可能发生该类隐患的地方使用协程闭包的范式,来统一约束,规避这类问题。我们来看下面的例子:
func Demo() { // 生产者负责产生一个只读通道,生产者自己负责了这个只读通道的生命周期 producter := func() <-chan int { results := make(chan int, 5) // 该通道只作用于特定闭包,此处是这个函数,内的作用域 go func() { defer close(results) for i := 0; i <= 5; i++ { results <- i } }() return results } // 消费者利用生产者使用只读通道 consumer := func(results <-chan int) { for result := range results { fmt.Printf("Received: %d\n", result) } fmt.Println("Done receiving!") } consumer(producter()) }
上述案例内,生产者构建了一个协程闭包,确保通过这个生产者产生的任务实例,内部的信号量、资源通道等生命周期各自独立控制,做到了“给你再弄一份,你随便弄”。
done通道 实现通知退出机制
在实际程序流程中,往往需要通知退出机制。我们根据之前所学,应该有体会到,select具有阻塞并等待通知的作用,通道被允许读写/被关闭成为一种信号,可以使被阻塞的select代码片段继续执行。我们利用select和通道的特性,特意声明一个不承载任何资源的管道,它将仅在被关闭时发出信号,供对应地方的select使用,从而加强流程控制。
一般的声明方法如下:
done := make(chan interface{})
支持通知退出的函数和模块应该要能够对done通道进行反应,以函数为例:
func testFunc(done chan interface{},paramA int,paramB int){// 在参数列表内应该要支持传入done通道 }
激发信号通过触发done通道的关闭来实现:
done := make(chan interface{}) // 中间的代码内部逻辑监听了done通道 close(done) //通过关闭done通道,中间所有监听了done通道的逻辑都会得到响应
你将在随后的例子加深done通道的具体实现和作用,例如下面的for-select。
for-select 实现循环条件阻塞
日常生活中,我们常常遇到在一个任务处理过程中,我们需要频繁的进行相同的条件阻塞后判定执行。在Golang程序中,我们通过构建for-select循环来解决这样的问题。
for-select循环的基本构建方法如下:
for { // 无限循环或遍历 select { // 对通道进行操作 } }
在Golang中,我们通常使用for-select解决两个基本问题:
- 往通道内发送可迭代变量
- 无限循环等待条件终止
下面的例子为我们展示了如何将一个字符串切片内的每一个元素丢入到stringStream这个通道中,并在done通道能够吐出内容时作为条件进行终止。你可以自行尝试将外层for循环内遍历的对象替换成一个大小在程序运行时未知的内容,例如另一个可遍历通道,这样可以更加准确的体会这种遍历发送,并随时可在下一次发送前决定是否终止的便利。
for _, s := range []string{"a", "b", "c"} { select { case <-done: //此处的done是一个之前声明好的done通道 return case stringStream <- s: } }
而在无限循环等待条件终止这一应用,则可以用下面的例子来体现:
for { select { case <-done: return default: // 你可以在这里执行一些任务 } // 也可以不在上面,在此处执行一些任务 }
简而言之,for-select循环很好的弥补了双方的劣势,即仅仅使用基于for的条件判定退出,往往资源利用更加浪费,有许多等待唤醒就好的操作场景下,for循环还是每次都需要消耗完整一个内核周期。而select缺点又在于只能使用一次,虽然有着良好的基于IO触发机制,但是无法复用。
在众多的Golang程序内,for-select循环将十分常见。
or-channel 实现终止信号合并
在某些情况下,你可能需要把多个done通道合并到一个done通道内,也就是说,合并过后的done通道,将在内部任何一个组件通道关闭的情况下也被关闭。
在这种需求下,我们的解决方案一般也分为两种,第一种是,写死一个冗长的select,基于select的抢占式执行机制,可以直观、方便的实现这种需求。在这种情况下,select本身就是这个组合过后的“通道”。但是在很多情况下,我们并不知道组件done通道的数量,这样我们就无法直接构建出对应的select代码块。幸运的是,我们依旧可以通过构建or-channel来实现这个需求,具体的代码实现如下:
orChannel := func(channels ...<-chan interface{}) <-chan interface{} { //1 switch len(channels) { case 0: //2 return nil case 1: //3 return channels[0] } orDone := make(chan interface{}) go func() { //4 defer close(orDone) switch len(channels) { case 2: //5 select { case <-channels[0]: case <-channels[1]: } default: //6 select { case <-channels[0]: case <-channels[1]: case <-channels[2]: case <-or(append(channels[3:], orDone)...): } } }() return orDone }
我们来简要的分析一下上面这段代码:
- 这里我们建立了名为or的函数,接收数量可变的通道并返回单个通道。
- 由于这是个递归函数,我们必须设置终止条件。第一个条件是,如果传入的切片是空的,我们简单的返回一个nil通道。这与不传递通道的想法一致:我们不希望复合通道做任何事。
- 第二个递归终止条件是,如果切片只含有一个元素,我们就返回给元素。
- 这是该函数最重要的部分,也是递归产生的地方。我们建立一个goroutine,以便可以不受阻塞地等待我们通道上的消息。
- 由于我们这里是递归的,每次递归调用将至少有两个通道。作为保持goroutine数量受到限制的优化方法,们在这里为仅使用两个通道的时设置了一个特殊情况。
- 在这里,我们递归地在第三个索引之后,从我们切片中的所有通道中创建一个or通道,然后从中选择。递归操作会逐层累计直到取到第一个通道元素。我们在其中传递了orDone通道,这样当该树状结构顶层的goroutines退出时,结构底层的goroutines也会退出。
通过这种奇妙的做法,我们只需要在面对这种需求的时候声明如上文例子内的or-channel,就可以做到向这个函数内丢入若干个done通道,在这些done通道任意一个标记为完成的时候,整个函数都会返回完成信号。
chRange 实现可控与准确遍历通道
在之前的编程实践中,我们提倡在遍历通道的过程中,加入done通道机制,确保在循环中可以接受信号中止,就好比必须告诉员工,公司宣布倒闭了之后就可以不用再干活了,否则公司都注销了,员工可能直到公司注销后相关企业权限消失才无所作为,而在宣布公司倒闭了之后的这段动荡期干了什么可怕的事情,就不得而知。因而,通过引入信号终止,我们能够确保一直受IO阻塞的代码片段,或者是已经抛出运行的协程,可以按照预期的接受关闭信号并关闭。
我们在之前的范式内,已经大量运用了利用select封装done通道,确保相关操作的可控性。更加实用地,如果在项目内,我们频繁要对各类通道进行遍历,由于每个通道我们都希望遍历可控,我们可以构建一个安全的专门用于遍历的通道。
同时,在传统的基于done通道的控制内,无法对通道的行为方式做出判断。也就是说,你不知道正在执行读取操作的协程现在是什么状态。我们需要利用其他的机制来对退出的原因进行进一步精准标注。
简而言之,我们需要自行编写一种交互方式,实现通道的功能扩展:
- 这种新的通道需要能够接受done通道,内部确保可以处理基于done通道的终止信号
- 这种新的通道内部能够对数据读取通道本身的情况变化做出判定
具体的实现方法如下:
var chRange = func(done, ch <-chan interface{}) <-chan interface{} { valStream := make(chan interface{}) go func() { defer close(valStream) for { select { case <-done: return case v, ok := <-ch: if ok == false { //此处就是对数据读取通道(非done通道)相关的读取情况做了处理 return } select { case valStream <- v: case <-done: } } } }() return valStream }
我们可以看到,通过上述的方法,实现了之前需求内要求的功能扩展。而且,在上面的案例中,我们发现在chRange内使用到的协程利用外部变量实现数据传送,同时通过defer影响,做到特殊情况下改变外部变量,这种在函数内依托协程和某些变量相互影响实现某些功能的实践叫做协程闭包,当然我们不会针对协程闭包进一步展开讲解,此时你无需理解它,只需要去感受它。
有了上面自己构建的高级通道,你就可以在其他地方以更加抽象和简单的方式进行安全可控的遍历通道了:
for val := range chRange(done, ch) { //此处的chRange就是上面封装好的结构 fmt.Printf("read %v \n", val) }
tee-channel 分割通道数据流
我们常常会遇到这样的需求,在web服务器内,服务器接受并处理请求同时根据这个请求打印一份日志;在远程调用存储时我们希望透传数据顺带进行一份备份。在抗日电视剧内,我们往往会看到很多爱国酒楼老板等角色,看到关键人物进了酒楼,不但走正常的接待客人程序,也顺带把对应的信息带给八路军。
细细一想,上面的需求,如果设计成串行,会有不少的问题。我们直观的用爱国酒楼老板智报八路军的场景,如果操作是串行,那么如果汇报八路军,在效率和差错上稍有不慎,日本鬼子就会发现端倪,从而斩草除根。因而,在这种情况下,我们需要保证另一路信息流动不应该影响到当前路的信息流动。
在这样的考量下,tee-channel方式应运而生,典型的实践方法如下:
var tee = func(done <-chan interface{}, in <-chan interface{}) (_, _ <-chan interface{}) { out1 := make(chan interface{}) out2 := make(chan interface{}) go func() { defer close(out1) defer close(out2) for val := range chRange(done, in) { select { case <-done: default: out1 <- val out2 <- val } } }() return out1, out2 }
通过分析代码,我们不难发现,除了输入参数内的done通道,用于数据输入的只有一个in通道,但是我们的tee返回了两个可供遍历的通道。从外部的程序设计来说,我们可以分别遍历这两个通道进行处理,在通道不关闭与阻塞的情况下,两边的交互都是不受影响的。
利用这种模式,很容易使用通道作为系统数据的交汇点。
bridge-channel 实现通道桥接
在某种情况下,你需要对多个通道吐出数据还要进行归并有序。简而言之,你想要从一个统一的通道中获取多个通道的值,换言之你让多通道统一而混乱的分别吐出又收纳到一个有序的通道内进行处理,那么你可以尝试构建bridge-channel实现通道桥接。简单的实现一般如下:
var bridge = func(done <-chan interface{}, chanStream <-chan <-chan interface{}) <-chan interface{} { valStream := make(chan interface{}) go func() { defer close(valStream) for { var stream <-chan interface{} select { case maybeStream, ok := <-chanStream: if ok == false { return } stream = maybeStream case <-done: return } for val := range chRange(done, stream) { //这里用到了我们先前构建的安全遍历通道 select { case valStream <- val: case <-done: } } } }() return valStream }
通过这样的构建,我们可以在收敛之后,将注意力专注在一个通道上的数据流处理,而无需关心多通道数据流归并问题。从时间效率上说,对于各通道零散、先后的数据传送,bridge-channel可以使得这种数据传送在最终对外表现出一致和均匀的特性,从而提升依附在这个数据流上统一处理的效能。
规整化流水线 构建基本作用单元
熟悉Linux的朋友可能了解管道符,“|”这个符号代表着shell命令内的管道,作用是可以使得上一条命令产生的结果又会作为下一条命令的原材料。由于在golang中,一般用channel来指代管道,毕竟管道确实是可以往里送东西,也可以从里面取东西——传输是双向的,但是在许多其他中文书籍中,经常会使用“通道”来指代channel,而用“管道”一词来指代pipeline。为了更加准确的描述,在本书中,我们直接用“流水线”一词来指代pipeline,因为这象征着有序、单向。
以前你可能见过这样的代码组织:
FuncC(FuncB(FuncA()))
通过函数的层层嵌套,实现了FuncA函数的结果,作为FuncB函数的输入,而FuncB基于FuncA结果输入的结果,又成为了FuncC的输入,前因后果,层层递进。但是在并发场景下,这种方式却天然隐含着各类不安全要素,我们有必要通过吸纳先前规避并发安全问题的实践,总结出可称为范式的并发场景下流水线实践。一般说来,这是基于done通道和数据通道共同构成的闭包函数进一步组织成的一种模式。可以总结有下面三个阶段:
- 数据生成阶段
- 数据处理阶段
- 结果收取阶段
在生成阶段,我们需要构建如下的生成器,下面的例子演示了一个将整数数组规整化为一个有序整型管道:
generator := func(done <-chan interface{}, integers ...int) <-chan int { intStream := make(chan int) go func() { defer close(intStream) for _, i := range integers { select { case <-done: return case intStream <- i: } } }() return intStream }
生成器确保分散数据可以收敛到流,也就是可以被送进管道。有了生成器处理,随后的部分是若干个通道处理器。下面的例子演示了一个接受整型管道并对传送的数进行乘上multiplier的生成器:
handlerFunc := func(done <-chan interface{}, intputStream <-chan int, multiplier int) <-chan int { outputStream := make(chan int) go func() { defer close(outputStream) for i := range intputStream { select { case <-done: return case outputStream <- i * multiplier: } } }() return outputStream }
可以看到,通道处理器提供逻辑上的数据处理,实际的数据变化均发生在通道处理器阶段。在经过中间若干通道处理器的处理后,最终结果使用range遍历:
results := handlerFunc1(done,handlerFunc2(done,handlerFunc3(done,generator(done,【原材料】)))) //此时results也是一个可遍历管道,内部将源源不断送出处理好的原材料 for v := range results { fmt.Println(v) //通过range的方式,可以源源不断的取用最终的结果 }
通过规整化流水线,我们不但可以方便的对不同流水线进行衔接,还可以逻辑清晰的在未来在有需要的情况下,对某一个流水线逻辑进行单独调整。而且,在整个流水线全程我们都引入了协程闭包和通知取消,其内部本身具有更加完善的并发安全机制,外部也可以方便的发送信号进行流水线取消,十分的实用。
小结
本节我们通过介绍golang并发编程中常见的并发范式,做到对多个场景下经典解决方案的实例化,为我们后续进一步构建大型场景化并发解决方案提供了高级素材。正如你所见,在上述的例子内,我们没有引入任何新的并发原语,一切都是你熟悉的go关键字、channel和select,以及少许3.4节内提到的waitgroup等。
这些经典的处理方式,已经落实到具体代码,成为事实标准,希望你能认真吸收和学习,并体会和实际生活的关联。