无锁队列讲解(多生产者多消费者场景)
(有什么问题欢迎讨论区留言,b站链接为本人录制的视频讲解~)
代码根据论文 [Implementing Lock-Free Queues](http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.53.8674&rep=rep1&type=pdf) 复现。代码地址:https://github.com/zxwsbg/lock-free-queue
b站链接:https://www.bilibili.com/video/BV1q54y1Y71W/
背景介绍
生产者和消费者问题是操作系统中老生常谈的一个问题。针对不同的场景(单生产者-单消费者、单生产者-多消费者、多生产者-多消费者)有着不同的解决方法,本文研究的是多生产者多消费者的场景。解决多生产者多消费者的常见方法是`互斥锁+信号量`,而互斥锁开销过大,在高并发常见下会有很大的性能影响。而采用无锁队列的方法可以避免锁的使用来达到减小开销的目的。
无锁队列的实现的要点就是 CAS 操作,后续在考虑到内存重新分配的时候会造成安全性问题(ABA问题),故引入了 DoubleCAS 或者其他内存分配机制来解决问题。
无锁队列实现
数据结构介绍
按照论文中提到的两种方法,选了一种较优的方法实现。
以单向链表的形式实现这个队列,每个节点的数据结构为
class QueueNode { public: int val; QueueNode* next; QueueNode(int val) : val(val) { next = NULL; } };而对于一个无锁队列对象而言,包含以下部分
- 数据:队列的最大长度、链表头结点、链表尾节点
- 函数:初始化(构造函数)、入队、出队
class LockFreeQueue { public: LockFreeQueue(); bool enqueue(int val); int dequeue(); ~LockFreeQueue(); private: int queue_size; // 暂时未使用,论文里并没有提及最大资源数 QueueNode* tail; QueueNode* head; };其数据结构包含关系如下图所示
入队操作
底下我们以入队操作为例bool LockFreeQueue::enqueue(int val) { QueueNode* cur_node; QueueNode* add_node = new QueueNode(val); while (1) { cur_node = tail; if (__sync_bool_compare_and_swap(&(cur_node->next), NULL, add_node)) { break; } else { __sync_bool_compare_and_swap(&tail, cur_node, cur_node->next); } } __sync_bool_compare_and_swap(&tail, cur_node, add_node); return 1; }这里的 `__sync_bool_compare_and_swap`就是 GCC 提供的 CAS 算法。
第7行的CAS首先尝试将新加的节点放到链表末尾,这里有两种结果:
1. 加入成功,那么退出循环进入倒数第三行
2. 加入不成功,说明这时候有其他线程抢先往里面插入了一个节点,那么就把当前节点的位置更新为尾节点,再次进入循环直到能正确更新
到了最后一个CAS的时候,只需要进行一次置尾操作,并不需要循环,原因是:如果当前线程将节点已经加进去了的话,那么其他所有线程的操作都会失败,只有当前线程更新尾节点完成后,其他线程的第二个CAS操作才能成功。
这里有一个小trick,明明第6行每次循环时都会更新节点,为什么还需要第二个CAS操作呢?因为在极端情况下,一个线程已经完成了增加节点操作,在置尾操作(第三个CAS)之前突然挂了,这时候就导致其他所有线程全部不能更新。
在左耳朵耗子的博客中,对于上述问题的解决方法描述是选用了《Implementing Lock-Free Queues》论文中提到的第二个方法,该方法采用的是一开始在head,然后不断找next直到找尾节点(通过多个线程共用fetch来减少开销)。本文以论文中提到的第一个方法为例进行描述(因为我觉得它更优雅)。
出队操作
与入队操作类似。int LockFreeQueue::dequeue() { QueueNode* cur_node; int val; while (1) { cur_node = head; if (cur_node->next == NULL) { return -1; } if (__sync_bool_compare_and_swap(&head, cur_node, cur_node->next)) { break; } } val = cur_node->next->val; delete cur_node; return val; }
多生产者-多消费者模型
我们采用 C++ 的 Thread 库,模拟多个生产者和多个消费者的情况。多个生产者线程向无所队列中插入数据,多个消费者从无锁队列中取出数据。测试部分的代码实现在 `lockfreequeue_test.cpp` 文件中。以生产者为例,其代码如下所示。
int thread_number; int task_number; // 每个线程需要入队/出队的资源个数 LockFreeQueue* lfq; void produce(int offset) { // 算上偏移量,保证不会出现重复 for (int i = task_number * offset; i < task_number * (offset + 1); i++) { printf("produce %d\n", i); lfq->enqueue(i); } } int main(int argc, char** argv) { lfq = new LockFreeQueue; std::vector<std::thread> thread_vector1; for (int i = 0; i < thread_number; i++) { thread_vector1.push_back(std::thread(produce, i)); } for (auto& thr1 : thread_vector1) { thr1.join(); } }
正确性检测
对于结果,首先做正确性检验,具体实现在 `check.py` 文件中,对于produce,要求之前没有生产过该资源;对于consume,要求之前生产过并且没有消费过该资源。ABA 问题
在高并发场景下会出现该问题。如上述情况,以默认值参数运行后经常不能通过正确性检测。原因根据我的判断,是因为ABA问题:当我们做CAS的之前,如果head的那块内存被回收并被重用了,而重用的内存又被EnQueue()进来了。而判断的方法也是个小 trick ,只需要将deque操作中释放内存的步骤注释掉,那么就不会出现内存重用的问题,这样做后也确实可以通过正确性测试。这个问题在论文中也提到了并给出了相应解决方法,但其需要自己实现一套内存分配系统(麻烦)。于是本文参考另一篇论文[Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms](http://www.cs.rochester.edu/u/scott/papers/1996_PODC_queues.pdf),利用`Double-CAS`来解决此问题,每访问一个节点,都用引用计数的方式给其+1,`Double-CAS`则可以一次既判断原始的节点值,又判断引用计数值,保证其未被置换出去过。这一块的具体代码实现可见github项目代码的`fix_aba_problem`分支。