线程间共享数据——使用互斥量保护共享数据
📖线程间共享数据——使用互斥量保护共享数据
🏷 前言
在C++并发编程中,保护共享数据最简单的方法是使用互斥量。
在访问共享数据前,开发者可使用互斥量将相关数据锁住,并于访问结束后将数据解锁。因此,线程库需要保证当一个线程使用特定互斥量锁住共享数据时,其他线程仅可在数据被解锁后才能访问。
互斥量是C++中最通用的一种数据保护机制,但它并非“银弹”,精心组织代码来保护正确的数据(见3.2.2节),并在接口内部避免竞争条件(见3.2.3节)是非常重要的。但互斥量自身也有问题,也会造成死锁(见3.2.4节),或是对数据保护的太多(或太少)(见3.2.8节)。
🏷 3.2.1 C++中使用互斥量
⭐️lock() unlock上锁,解锁
C++中通过实例化std::mutex创建互斥量,通过调用成员函数lock()进行上锁,unlock()进行解锁。不过,不推荐实践中直接去调用成员函数,因为调用成员函数就意味着,必须记住在每个函数出口都要去调用unlock(),也包括异常的情况。
⭐️RAII std::lock_guard
C++标准库为互斥量提供了一个RAII语法的模板类std::lock_guard,其会在构造的时候提供已锁的互斥量,并在析构的时候进行解锁,从而保证了一个已锁的互斥量总是会被正确的解锁。
🌰示例
下面的程序清单中,展示了如何在多线程程序中,使用std::mutex构造的std::lock_guard实例,对一个列表进行访问保护。std::mutex和std::lock_guard都在头文件中声明。
清单3.1 使用互斥量保护列表
#include <list> #include <mutex> #include <algorithm> std::list<int> some_list; std::mutex some_mutex; void add_to_list(int new_value) { std::lock_guard<std::mutex> guard(some_mutex); some_list.push_back(new_value); } bool list_contains(int value_to_find) { std::lock_guard<std::mutex> guard(some_mutex); return std::find(some_list.begin(),some_list.end(), value_to_find) != some_list.end(); }
❓问题:如果一个成员函数返回的是保护数据的指针或引用,那么就一定存在数据破坏的可能性
在上述使用过程中,some_list是一个全局变量,它被一个全局互斥量some_mutex所保护。虽然某些情况下使用全局变量没问题,但在大多数应用场景中,互斥量通常会与保护的数据位于同一个类内,而非像上文一样作为全局变量。显然,这是面向对象设计准则:将互斥量作为data member置于类中。类似地,函数add_to_list和list_contains应当作为成员函数。需要注意的是,互斥量与要保护的数据在类中需要被定义为private成员,所有成员函数均需在调用时对数据上锁,结束时对数据解锁,如此则可保证数据不被破坏(这里说的并不完全正确,如果成员函数均内置有上锁操作,则内部调用时需要嵌套锁机制)。
当然,现实情况并非总是如此理想,应该意识到,如果一个成员函数返回的是保护数据的指针或引用,那么就一定存在数据破坏的可能性,原因在于使用者可以通过引用或指针直接访问数据,从而绕开互斥量的保护。因此,如果一个类使用互斥量来保护自身数据成员,其开发者必须谨小慎微地设计接口,确保互斥量能锁住任何对数据的访问,并且不留后门。
🏷 3.2.2 精心组织代码来保护共享数据
正如上文所述,使用互斥量来保护数据时并非只是在每一个成员函数中都加入一个std::lock_guard对象那么简单,任何一个传递至外界的指针或引用都将令这种保护形同虚设。某些开发者认为规避这项风险非常容易:仅仅需要在设计过程中检查接口是否返回指向内部数据的handle即可,但事实绝非如此。在确保成员函数不会传出指针或引用的同时,检查成员函数是否通过指针或引用的方式来调用也非常重要(尤其是该操作不在你的控制下时)。成员函数可能在没有互斥量保护的区域内存储着指针或引用,这是一种非常危险的行为,更危险地是:将保护数据作为一个运行时参数。下述实例展示了这一点。
🌰问题实例
清单3.2 无意中传递了保护数据的引用
class some_data { int a; std::string b; public: void do_something(); }; class data_wrapper { private: some_data data; std::mutex m; public: template<typename Function> void process_data(Function func) { std::lock_guard<std::mutex> l(m); func(data); // 1 传递“保护”数据给用户函数 } }; some_data* unprotected; void malicious_function(some_data& protected_data) { unprotected=&protected_data; } data_wrapper x; void foo() { x.process_data(malicious_function); // 2 传递一个恶意函数 unprotected->do_something(); // 3 在无保护的情况下访问保护数据 }
👼实例分析
看起来process_data没有任何问题,但调用用户自定义的func则意味着foo可以绕过保护机制将函数malicious_function传入,在没有互斥量锁定的情况下调用do_something。
从review结果来看,开发者只是单纯地将所有可访问的数据结构代码标记为互斥而已,但在foo()中调用 unprotected->do_something()的代码未能被标记为互斥。C++标准库并不能针对这种行为做出保护,因此必须谨记:切勿将受保护数据的指针或引用传递到互斥锁作用域之外。
🏷 3.2.3 发现接口内在的条件竞争
🤚问题背景
假设当前存在一个stack class,除去构造函数与swap外,其大致有5个接口:
push()
pop()
top()
empty()
size()
template<typename T,typename Container=std::deque<T> > class stack { public: explicit stack(const Container&); explicit stack(Container&& = Container()); template <class Alloc> explicit stack(const Alloc&); template <class Alloc> stack(const Container&, const Alloc&); template <class Alloc> stack(Container&&, const Alloc&); template <class Alloc> stack(stack&&, const Alloc&); bool empty() const; size_t size() const; T& top(); T const& top() const; void push(T const&); void push(T&&); void pop(); void swap(stack&&); };
出于性能的要求,我们一般会将top()返回一个引用而非拷贝,显然这与上文所要求的不符。但需要注意的是即使修改了top(),使其返回一个拷贝,该接口依然存在条件竞争。更进一步地说,该问题与使用互斥量并无关联,在无锁编程实现的接口中条件竞争也依然存在。总之,这是接口本身所具备的性质,与实现方式无关。
🌰条件竞争实例
虽然empty()与size()在被调用并返回时是正确的,但其结果并不可靠:当它们返回后,其他线程则可自由地访问栈,并且push()多个新元素至栈中;当然,也可能pop()一些已在栈中的元素。在这种情况下,empty()与size()返回值可认为是无效的。
更进一步地,当工作环境为单线程时,使用empty()判空后再调用top()是一种安全行为:
stack<int> s; if (!s.empty()) { int const value = s.top(); s.pop(); do_something(value); }
但在并发编程中上述代码将不再安全,因为在调用empty()和调用top()之间,可能有来自另一个线程的pop()调用,并且该调用可能删除了最后一个元素。这是一个经典的条件竞争问题,使用互斥量对栈内部数据进行保护并不能阻止条件竞争的发生,即接口固有问题。
👼问题分析
由于问题来源于接口设计,因此解决问题势必需要更改接口。有人提出这样一个方案:在调用top()时检查当前容器是否为空,若是则抛出异常。但该解决方案非常拙劣,也就是说,即使empty()返回了false,我们仍然需要异常捕获机制,本质上这令empty()成为了一个多余接口。
另一个潜在的条件竞争发生于top()与pop()之间。假设存在两个线程运行着上述程序,并且他们分享着同样的栈实例,若初始条件下栈内仅存在两个元素,由于stack对象含有互斥量,因此成员函数只能够交错运行,但do_something作为非成员函数可以并发运行。针对上述假设,可能存在某种执行顺序如下表所示。
Thread A | Thread B |
---|---|
if (!s.empty()) | |
if (!s.empty()) | |
int const value = s.top(); | |
int const value = s.top(); | |
s.pop() | |
do_something(value); | s.pop() |
do_something(value); |
我们的本意是线程A与线程B分别调用了top()与pop(),从而获取到了栈内的第一个与第二个元素,但在这种执行顺序下,线程A与线程B获取的均为同一个值,而另一个值被直接丢弃了。相对于未定义行为,这种条件竞争更加难以定位和排查。
可能有读者因为长期使用C++,因此在阅读上文的过程中一头雾水,并未了解这种接口设计有何不合理之处。一言以蔽之:在另一些语言中,pop()接口将直接移除顶端对象并返回其拷贝,这种接口设计将保证上述条件竞争不再发生(由于操作均集中在成员函数内部没有被分割,而成员函数本身是交错运行的)。具体来说,为了获取被移除的顶部元素,C++开发者必须写:
vectorint> temp = s.top(); s.pop();
而不能够像java开发者一样:
ArrayListint> temp = s.pop();
🔺问题起因
C++保证不能安全的将元素拷贝出去的情况下,栈中的这个数据还依旧存在,没有丢失, 所以将pop()和top()操作分开,但也因此引入了竞争条件
说一些大家没有意识到的问题:假设有一个stack>,vector是一个动态容器,当你拷贝一个vetcor,标准库会从堆上分配很多内存来完成这次拷贝。当这个系统处在重度负荷,或有严重的资源限制的情况下,这种内存分配就会失败,所以vector的拷贝构造函数可能会抛出一个std::bad_alloc异常。当vector中存有大量元素时,这种情况发生的可能性更大。当pop()函数返回“弹出值”时(也就是从栈中将这个值移除),会有一个潜在的问题:这个值被返回到调用函数的时候,栈才被改变;但当拷贝数据的时候,调用函数抛出一个异常会怎么样? 如果事情真的发生了,要弹出的数据将会丢失;它的确从栈上移出了,但是拷贝失败了!std::stack的设计人员将这个操作分为两部分:先获取顶部元素(top()),然后从栈中移除(pop())。这样,在不能安全的将元素拷贝出去的情况下,栈中的这个数据还依旧存在,没有丢失。当问题是堆空间不足,应用可能会释放一些内存,然后再进行尝试。
不幸的是,这样的分割却制造了本想避免或消除的条件竞争。幸运的是,我们还有的别的选项,但是使用这些选项是要付出代价的。
⭐️解决方案
选项1: 传入一个引用
template<typename T,typename Container=std::deque<T> > class stack { public: ... // member function declaration void pop(T&); }; std::vector<int> result; some_stack.pop(result);
该方案可以用于多数应用场景,但缺点也十分明显:需要构造出一个栈中类型的实例,用于接收目标值(即上述实例中的result)。构造实例主要存在三种问题:
对于某些类型而言,这种做法不切实际,因为临时构造一个实例从时间和资源的角度来看都不划算。
部分类型不支持默认构造函数,而构造它所需要的参数此时是未知或者不确定的。
这种操作需要类型具备可赋值属性(operator=),而很多用户自定义类型可能并不支持赋值操作。
选项2:无异常抛出的拷贝构造函数或移动构造函数
在设计之初之所以将pop()返回值设定为void,是因为返回一个具体的值存在异常安全问题。那么追本溯源之后,读者自然会想到设计某种不会抛出异常的拷贝或移动构造函数。
这种做法虽然安全,但决不能称之为可靠。尽管C++标准库能够令开发者在编译时使用std::is_nothrow_copy_constructible和std::is_nothrow_move_constructible类型特征来判断拷贝/移动构造函数是否抛出异常,但这种方式的局限性太强。大量的用户自定义类型的拷贝/移动构造函数不具备nothrow属性,将它们隔离于线程安全的stack之外似乎不太友好。
选项3:返回指向弹出值的指针
第三种方案是返回一个指向弹出元素的指针,而不是直接返回值。指针的优势在于它可以自由拷贝,并且不会产生异常。但缺点在于指针也需要占据一定的内存空间,对于简单数据类型(比如int),这种开销要远大于直接返回值。 在该解决方案下,推荐使用std::shared_ptr作为函数返回值。智能指针不仅能避免内存泄露,并且其内存分配全部由标准库完成,即不需要手动地new与delete(使用make_shared)。
选项4:“选项1 + 选项2”或 “选项1 + 选项3”
对于通用的代码来说,灵活性不应忽视。当你已经选择了选项2或3时,再去选择1也是很容易的。这些选项提供给用户,让用户自己选择对于他们自己来说最合适,最经济的方案。
⭐️解决实例
下述实例组合了方案1与方案3,并在其中封装了std::stack,其代码概述如下所示:
#include <exception> #include <memory> #include <mutex> #include <stack> struct empty_stack: std::exception { const char* what() const throw() { return "empty stack!"; }; }; template<typename T> class threadsafe_stack { private: std::stack<T> data; mutable std::mutex m; public: threadsafe_stack() : data(std::stack<T>()){} threadsafe_stack(const threadsafe_stack& other) { std::lock_guard<std::mutex> lock(other.m); data = other.data; // 调用stack::operator= } threadsafe_stack& operator=(const threadsafe_stack&) = delete; void push(T new_value) { std::lock_guard<std::mutex> lock(m); data.push(new_value); } std::shared_ptr<T> pop() { std::lock_guard<std::mutex> lock(m); if(data.empty()) throw empty_stack(); std::shared_ptr<T> const res(std::make_shared<T>(data.top())); data.pop(); return res; } void pop(T& value) { std::lock_guard<std::mutex> lock(m); if(data.empty()) throw empty_stack(); value=data.top(); data.pop(); } bool empty() const { std::lock_guard<std::mutex> lock(m); return data.empty(); } };
🏷 3.2.4 死锁
首先以一个最基本的例子来抽象什么是死锁:
面试ing.... 面试官:"如果你能够讲清楚什么是死锁,我就给你发offer。" 候选人:"如果你能够给我发offer,我就告诉你什么是死锁。"
死锁是这样一种场景:存在一对线程,他们都需要执行一些操作,这些操作以锁住自己的互斥量作为开头,并且需要对方释放其持有的互斥量,在这种场景下没有线程能够正常工作,因为它们都在等待对方释放互斥量。当存在两个以上的互斥量锁定同一个操作时,死锁很容易发生。
避免死锁
一般来说,让两个互斥量总是以同样的顺序上锁即可避免死锁,如总是在锁住互斥量A之前锁住互斥量B。但是在某些应用场景下事情没这么简单,比如说多个互斥量保护同一个类的独立实例时。考虑如下场景:某类存在一个对象间数据交换操作(类似于swap),为了确保正确地交换数据而不受并发环境影响,我们需要锁定两个实例的互斥锁。由于swap需要接受两个参数(两个实例),假设第一个参数先加锁,第二个后加锁,当swap()两个操作对象实际为同一实例时将导致死锁。
幸运地是,C++标准库提供了同时锁住多个互斥量且没有死锁风险的操作:std::lock。下文将展示如何将std::lock应用于swap函数中:
class some_big_object; void swap(some_big_object& lhs,some_big_object& rhs); class X { private: some_big_object some_detail; std::mutex m; public: X(some_big_object const& sd):some_detail(sd){} friend void swap(X& lhs, X& rhs) { if(&lhs==&rhs) return; //1 std::lock(lhs.m,rhs.m); std::lock_guard<std::mutex> lock_a(lhs.m,std::adopt_lock); std::lock_guard<std::mutex> lock_b(rhs.m,std::adopt_lock); swap(lhs.some_detail,rhs.some_detail); } };
这里首先保证不是两个相同的实例,因为尝试获取一个持有的std::mutex锁是一个未定义行为(std::recursive_mutex提供这个功能)。之后,使用std::lock锁住两个互斥量。这里的std::adopt_lock表示不需要std::lock_guard进行锁的获取,而是对传入的锁进行管理,这样也就不会创建新的锁的。
std::lock提供了all or nothing 的功能,要么都上锁,要么都不上锁。一旦有一个锁的申请抛出异常,其他锁都会被释放。这实际上有点事务的意思。
以 std::scoped_lock 鎖定多個 Mutex
C++ 17 提供 std::scoped_lock 作為 std::lock_guard 的替代品。std::scoped_lock 能讓一個執行緒一次鎖定多個 Mutex。std::scoped_lock 的實作可以任意決定 Mutex 的鎖定順序但是必須實作 Dead Lock 防護機制。
上面的例子可以改寫為:
void swap(Node &lhs, Node &rhs) { if (&lhs == &rhs) return; std::scoped_lock locks(lhs.m, rhs.m); std::swap(lhs.data, rhs.data); }
備註:std::scoped_lock 的建構式是 Variadic Template。它能接受任意數量的 Mutex,例如:std::scoped_lock guard(mutex1, mutex2, mutex3);。
scoped_lock()是lock_guard()的严格高级版本,可一次锁定所有数量的互斥锁(使用与std::lock()相同的避免死锁的算法)。在新代码中,您应该只使用scoped_lock()。
lock_guard()仍然存在的唯一原因是为了兼容性。它不能被删除,因为它在当前代码中使用。此外,事实证明,不希望更改其定义(从一元更改为可变),因为这也是可以观察到的,因此是重大更改(但出于某种技术原因)。
进一步地避免死锁
死锁这个名字总让人觉得仅有上锁操作才能触发死锁,但实际情况并非如此。举例而言,在无锁编程环境下,我们仅仅需要两个线程就能触发死锁:在它们的执行函数中调用对另一个线程的join即可。在这种情况下,两个线程均不能正常工作,因为他们都在等待对方结束(而这是不可能的)。此外,死锁并不一定仅仅发生于两个线程之间,而是多个线程。
以下将简单地给出一些如何鉴别与消除死锁的指导意见。
避免一个线程持有多个锁
第一个建议往往是最简单的:一个线程已获得一个锁时,再别去获取第二个。如果能坚持这个建议,因为每个线程只持有一个锁,锁上就不会产生死锁。即使互斥锁造成死锁的最常见原因,也可能会在其他方面受到死锁的困扰(比如:线程间的互相等待)。当你需要获取多个锁,使用一个std::lock来做这件事(对获取锁的操作上锁),避免产生死锁。
避免在持有锁时调用用户自定义程序
第二个建议是次简单的:因为代码是用户提供的,你没有办法确定用户要做什么;用户程序可能做任何事情,包括获取锁。你在持有锁的情况下,调用用户提供的代码;如果用户代码要获取一个锁,就会违反第一个指导意见,并造成死锁(有时,这是无法避免的)。当你正在写一份通用代码,例如3.2.3中的栈,每一个操作的参数类型,都在用户提供的代码中定义,就需要其他指导意见来帮助你。
使用固定顺序获取锁
如果硬性条件约束我们不得不获取多个锁并且不可使用std::lock,那么指导意见是在每一个线程中均保证以同样的顺序获取这些锁。某些情况下这很容易做到,但在另外一些环境中却很难实现。总之,我们可以建立某种约定,一个线程必须在锁住A后才能获取B的锁,锁住B后才能获取C的锁,即以禁止反向遍历为代价消除死锁产生的可能性,类似的约束通常也被用于建立数据结构。
使用层次锁
层次锁是上一个指导意见的特例,用于检查运行期约定是否被遵守。该指导意见要求开发者将应用程序层次化,并能够识别给定层上所有可被上锁的互斥量。当代码试图执行上锁操作时,它会检查当前是否已持有来自低层次的锁,若有则禁止上锁当前互斥量。你可以在运行期检查这一约束条件,通过对每一个互斥量分配层级编号并记录所有被线程上锁的互斥量的方式。遗憾的是C++标准库并没有提供相关机制,因此我们不得不自己实现一个hierarchical_mutex,先不考虑其具体定义,它的使用机制应该与下文类似:
hierarchical_mutex high_level_mutex(10000); hierarchical_mutex low_level_mutex(5000); int do_low_level_stuff(); int low_level_func() { std::lock_guard<hierarchical_mutex> lk(low_level_mutex); return do_low_level_stuff(); } void high_level_stuff(int some_param); void high_level_func() { std::lock_guard<hierarchical_mutex> lk(high_level_mutex); high_level_stuff(low_level_func()); } void thread_a() { high_level_func(); } hierarchical_mutex other_mutex(100); void do_other_stuff(); void other_stuff() { high_level_func(); do_other_stuff(); } void thread_b() { std::lock_guard<hierarchical_mutex> lk(other_mutex); other_stuff(); }
我们简单地分析一下thread_a与thread_b的运行情况。简单地来说,thread_a遵守了层级规则,而thread_b没有。我们可以注意到,thread_a调用了high_level_func,因此高层级互斥量high_level_mutex被上锁,随后又试图去调用low_level_func,此时低层级互斥量low_level_mutex被上锁,这与上文提及的规则一致:先锁高层级再锁低层级。thread_b的运行则没有这么乐观,它先锁住了层级为100的other_mutex,并在之后试图去锁住高层级的high_level_mutex,此时会发生错误,可能会抛出一个异常,又或者直接终止程序。
显然,使用层次锁时不可能由于错误的上锁顺序而产生死锁,因为互斥量必然以一定顺序逐一上锁。这也意味着你不可能在同一层级上同时持有多个锁,层次锁要求所有互斥量都处于一条链上,并且每个互斥量的层级值都必须低于它的前一个。
hierarchical_mutex可以作为模板参数传入std::lock_guard,因此它必须实现三个接口:lock、unlock、try_lock,其大致实现如下所示:
class hierarchical_mutex { std::mutex internal_mutex; unsigned long const hierarchy_value; unsigned long previous_hierarchy_value; static thread_local unsigned long this_thread_hierarchy_value; void check_for_hierarchy_violation() { if(this_thread_hierarchy_value <= hierarchy_value) { throw std::logic_error(“mutex hierarchy violated”); } } void update_hierarchy_value() { previous_hierarchy_value=this_thread_hierarchy_value; this_thread_hierarchy_value=hierarchy_value; } public: explicit hierarchical_mutex(unsigned long value): hierarchy_value(value), previous_hierarchy_value(0) {} void lock() { check_for_hierarchy_violation(); internal_mutex.lock(); update_hierarchy_value(); } void unlock() { this_thread_hierarchy_value=previous_hierarchy_value; internal_mutex.unlock(); } bool try_lock() { check_for_hierarchy_violation(); if(!internal_mutex.try_lock()) return false; update_hierarchy_value(); return true; } }; thread_local unsigned long hierarchical_mutex::this_thread_hierarchy_value(ULONG_MAX);
需要注意的是,hierarchical_mutex使用了一个static data memberthis_thread_hierarchy_value来表征当前线程层级值,它被初始化为最大值,因此在初始阶段任何互斥量均可被锁住。由于其具备thread local属性,因此每个线程都有其拷贝副本,各线程中变量状态完全独立。每次完成lock或unlock都将更新当前线程层级值。
超越锁的延伸扩展
如我在本节开头提到的那样,死锁不仅仅会发生在锁之间;死锁也会发生在任何同步构造中(可能会产生一个等待循环),因此这方面也需要有指导意见,例如:要去避免获取嵌套锁等待一个持有锁的线程是一个很糟糕的决定,因为线程为了能继续运行可能需要获取对应的锁。类似的,如果去等待一个线程结束,它应该可以确定这个线程的层级,这样一个线程只需要等待比起层级低的线程结束即可。可以用一个简单的办法去确定,以添加的线程是否在同一函数中被启动,如同在3.1.2节和3.3节中描述的那样。
当代码已经能规避死锁,std::lock()和std::lock_guard能组成简单的锁覆盖大多数情况,但是有时需要更多的灵活性。在这些情况,可以使用标准库提供的std::unique_lock模板。如std::lock_guard,这是一个参数化的互斥量模板类,并且它提供很多RAII类型锁用来管理std::lock_guard类型,可以让代码更加灵活。
🏷 3.2.6 std::unique_lock——灵活的锁
相较于std::lock_guard,std::unqiue_lock并不与互斥量的数据类型直接相关,因此使用起来更加灵活。它在构造时可以传入额外的参数,如std::adopt_lock与std::defer_lock,前者用于管理互斥量,后者则用于表明当前互斥量应当保持解锁状态。如此一来,可以通过std::unique_lock的成员函数lock()执行上锁操作,又或者将std::unique_lock对象传入std::lock完成上锁。
std::unqiue_lock的时空间性能均劣于std::lock_guard,这也是它为灵活性付出的代价:std::unqiue_lock内存在某种标志用于表征其实例是否拥有特定的互斥量,显然,这些标志需要占据空间,并且标志的检查与更新也需要耗费时间。
std::unique_lock定义
互斥锁保证了线程间的同步,但是却将并行操作变成了串行操作,这对性能有很大的影响,所以我们要尽可能的减小锁定的区域,也就是使用细粒度锁。
这一点lock_guard做的不好,不够灵活,lock_guard只能保证在析构的时候执行解锁操作,lock_guard本身并没有提供加锁和解锁的接口,但是有些时候会有这种需求。看下面的例子。
class LogFile { std::mutex _mu; ofstream f; public: LogFile() { f.open("log.txt"); } ~LogFile() { f.close(); } void shared_print(string msg, int id) { { std::lock_guard<std::mutex> guard(_mu); //do something 1 } //do something 2 { std::lock_guard<std::mutex> guard(_mu); // do something 3 f << msg << id << endl; cout << msg << id << endl; } } };
上面的代码中,一个函数内部有两段代码需要进行保护,这个时候使用lock_guard就需要创建两个局部对象来管理同一个互斥锁(其实也可以只创建一个,但是锁的力度太大,效率不行),修改方法是使用unique_lock。它提供了lock()和unlock()接口,能记录现在处于上锁还是没上锁状态,在析构的时候,会根据当前状态来决定是否要进行解锁(lock_guard就一定会解锁)。上面的代码修改如下:
class LogFile { std::mutex _mu; ofstream f; public: LogFile() { f.open("log.txt"); } ~LogFile() { f.close(); } void shared_print(string msg, int id) { std::unique_lock<std::mutex> guard(_mu); //do something 1 guard.unlock(); //临时解锁 //do something 2 guard.lock(); //继续上锁 // do something 3 f << msg << id << endl; cout << msg << id << endl; // 结束时析构guard会临时解锁 // 这句话可要可不要,不写,析构的时候也会自动执行 // guard.ulock(); }
上面的代码可以看到,在无需加锁的操作时,可以先临时释放锁,然后需要继续保护的时候,可以继续上锁,这样就无需重复的实例化lock_guard对象,还能减少锁的区域。同样,可以使用std::defer_lock设置初始化的时候不进行默认的上锁操作:
void shared_print(string msg, int id) { std::unique_lock<std::mutex> guard(_mu, std::defer_lock); //do something 1 guard.lock(); // do something protected guard.unlock(); //临时解锁 //do something 2 guard.lock(); //继续上锁 // do something 3 f << msg << id << endl; cout << msg << id << endl; // 结束时析构guard会临时解锁 }
这样使用起来就比lock_guard更加灵活!然后这也是有代价的,因为它内部需要维护锁的状态,所以效率要比lock_guard低一点,在lock_guard能解决问题的时候,就是用lock_guard,反之,使用unique_lock。
后面在学习条件变量的时候,还会有unique_lock的用武之地。
另外,请注意,unique_lock和lock_guard都不能复制,lock_guard不能移动,但是unique_lock可以!
// unique_lock 可以移动,不能复制 std::unique_lock<std::mutex> guard1(_mu); std::unique_lock<std::mutex> guard2 = guard1; // error std::unique_lock<std::mutex> guard2 = std::move(guard1); // ok // lock_guard 不能移动,不能复制 std::lock_guard<std::mutex> guard1(_mu); std::lock_guard<std::mutex> guard2 = guard1; // error std::lock_guard<std::mutex> guard2 = std::move(guard1); // error
🌰应用实例
以下将简单地描述如何用std::unique_lock替换std::lock_guard:
class some_big_object; void swap(some_big_object& lhs,some_big_object& rhs); class X { private: some_big_object some_detail; std::mutex m; public: X(some_big_object const& sd):some_detail(sd){} friend void swap(X& lhs, X& rhs) { if(&lhs==&rhs) return; std::unique_lock<std::mutex> lock_a(lhs.m,std::defer_lock); std::unique_lock<std::mutex> lock_b(rhs.m,std::defer_lock); std::lock(lock_a,lock_b); swap(lhs.some_detail,rhs.some_detail); } };
🏷 3.2.7 不同域中互斥量所有权的传递
由于std::unique_lock并没有与自身相关的互斥量,因此互斥量所有权可以在不同实例间相互传递,std::unique_lock是一个标准的`move only object”。
互斥量所有权传递十分常见,比如在某个函数内完成对互斥量的上锁,并在其后将其所有权转交至调用者以保证它可以在该锁的保护范围内执行额外操作:
std::unique_lock<std::mutex> get_lock() { extern std::mutex some_mutex; std::unique_lock<std::mutex> lk(some_mutex); prepare_data(); return lk; } void process_data() { std::unique_lock<std::mutex> lk(get_lock()); do_something(); }
std::unique_lock的成员函数lock支持在其实例销毁之前放弃其拥有的锁。当锁没有必要长期持有时就应当主动释放,这对提升应用程序的性能十分有利。
🏷3.2.8 锁的粒度
锁的粒度是一个摆手术语(hand-waving term),用来描述一个锁保护着的数据量大小。一个细粒度锁(a fine-grained lock)能够保护较小的数据量,一个粗粒度锁(a coarse-grained lock)能够保护较多的数据量。简而言之,我们应当针对待保护数据量的大小选择合适的粒度,太大则过犹不及,太小则不足以保护。
假设当前存在这样一个场景:很多线程正在等待同一个资源,若某个线程持有锁的时间过长(不必要地占用着资源),这将导致程序整体时间性能下降。在理想情况下,我们仅仅会在访问某个共享数据时才会上锁,任何非共享数据的处理操作都应当在锁外执行。我们万不可在持有锁时执行一些特别费时的操作,比如文件的输入/输出。文件的IO操作往往要比内存读写慢上百倍,因此,除非当前存在用锁去保护文件访问的必要,我们绝不应当在持有锁时执行文件IO,这将造成其他线程不必要地阻塞,最终导致多线程带来的性能优势被这种拙劣的操作抵消殆尽。
这种情况下std::unique_lock将运行地非常完美,当需要处理锁外数据时,它将调用成员函数unlock以解除锁定,并在需要访问共享数据时调用lock再次上锁:
void get_and_process_data() { std::unique_lock<std::mutex> my_lock(the_mutex); some_class data_to_process=get_next_data_chunk(); my_lock.unlock(); // 处理前解锁 result_type result=process(data_to_process); my_lock.lock(); // 写入前加锁 write_result(data_to_process,result); }
正如上述实例所示,粒度并不仅仅表示锁保护的数据量,也表示控制锁的时间。通常情况下,持有锁的时间应当等价于执行操作所需的最短时间。因此除非必须执行,我们不应该在持有锁时执行耗时操作(比如获取另一个锁(即使你明确了解该操作不会造成死锁)或等待IO操作完成)。
在前文中我们曾经叙述过在并发编程下完成两个数据结构的交换操作,现在让我们试想下面这个场景:存在一个简单的数据结构Y,其内部数据仅仅含有一个int,现在需要完成两个Y实例之间的判同操作。由于复制一个int并不耗时,因此我们可以在持有锁的情况下完成拷贝操作,并返回两个被拷贝的int等价与否,这意味着我们不再需要同时持有两个锁,并且按照要求实现了仅仅在需要的时间段上锁:
class Y { private: int some_detail; mutable std::mutex m; int get_detail() const { std::lock_guard<std::mutex> lock_a(m); return some_detail; } public: Y(int sd):some_detail(sd){} friend bool operator==(Y const& lhs, Y const& rhs) { if(&lhs==&rhs) return true; int const lhs_value=lhs.get_detail();// 1 int const rhs_value=rhs.get_detail();// 2 return lhs_value==rhs_value; } };
需要注意的是,如果该结果返回了true,我们仅仅只能够保证在时间点1上的lhs.some_detail与时间点2上的rhs.some_detail相同,这两个值在被读取后可能会被任意的方式所修改。因为我们持有锁的时间并没有达到整个操作所需要的时间,因此该判同语义未必符合预期。
在某些情况下并不存在一个合适的粒度大小,因为并非所有对数据结构的访问都需要相同的保护级别。因此,你可能需要另一种机制来替换std::mutex。