《...多线程...使用muduo C++网络库》第二章笔记
《Linux多线程服务端编程:使用muduo+C++网络库》第二章笔记
标题有字数限制,所以没有写完整书名。
mutex(互斥器)
mutex是最常用的同步原语。mutex可以保护一个临界区,使得任何时刻最多仅有一个线程在临界区内活动。
mutex根据POSIX中的实现可以分为递归(recursive)和非递归(non-recursive)两种,主要区别为同一个线程可以重复对recursive mutex加锁,而对non-recursive mutex则不可以。
书中提到使用上的原则大概是:
- RAII封装mutex的生存及其相关操作。
- 使用Guard管理mutex锁,并利用生存期控制Guard,基于此结构(Scoped Locking)可以使debug很方便。
- 构造Guard时,防止加锁顺序不同导致死锁。
- 使用非递归的mutex。
- 跨进程时使用tcp sockets。
- 尽量使用高层同步方法。
递归互斥器的一些问题
显然recursive mutex可以避免一个线程将自己锁死,但如果使用不当会带来较大的问题。
MutexLock mutex; std::vector foos; // write void post(const Foo& f){ MutexLockGuard lock(mutex); foos.push_back(f); } // read void traverse() { MutexLockGuard lock(mutex); for(auto& m: foos) m.doit(); }
上述两个函数中,post()加锁并修改了foos内容;traverse()加锁并便利了foos内容。他们在单独执行时都是没问题的。
而如果Foo::doit()需要调用post(),就会出现以下问题:
- recursive mutex,post()不会被阻塞,由此导致foos内容改变,可能会出现空间分配导致的迭代器失效,使得程序偶尔崩溃;
- non-recursive mutex,post()运行时发现mutex已经被锁住,于是等待,导致死锁。
在调试一个多线程bug时,分析一个问题出现的原因是很麻烦的,而如果是依照使用原则而出现了死锁,直接查看back trace就可以很快定位问题。
thread apply all bt
上述例子实际上是设计上不够合适导致的,要使其正常工作需要调整工作方式,例如copy-on-write等。
两个线程死锁的例子
上一节提到了一个线程将自己锁死的例子,下面给出一个race condition导致死锁的例子。
class Inventory { public: void add(Request* req) { muduo::MutexLockGuard lock(mutex_); requests_.insert(req); } void remove(Request* req) { muduo::MutexLockGuard lock(mutex_); requests_.erase(req); } void printAll() const { muduo::MutexLockGuard lock(mutex_); sleep(1); for(auto&m:requests_) m->print(); cout<<"Inventory::printAll() unlocked"<<endl; } private: mutable muduo::MutexLock mutex_; set requests_; };
Inventory 是一个管理Request的类,提供"add"和"remove"两个线程安全的方法。
class Request { public: void process(Inventory& inventory) { muduo::MutexLockGuard lock(mutex_); recent_inventory = &invertory; recent_inventory->add(this); } ~Request() { muduo::MutexLockGuard lock(mutex_); sleep(1); recent_inventory->remove(this); } void print() const { muduo::MutexLockGuard lock(mutex_); // todo: 打印请求内容 } private: Inventory* recent_inventory; mutable muduo::MutexLock mutex_; };
Request是一个请求相关的类。
void threadFunc(Inventory& inventory) { Request* req = new Request; req->process(inventory); delete req; } int main() { Inventory inventory; muduo::Thread t(threadFunc, std::ref(inventory)); t.start(); usleep(500 * 1000); inventory.printAll(); t.join(); return 0; }
visual studio中可以在运行后使用"全部中断"按钮来打断,于是可以看到两个线程死锁的情况。
借用书中的图,时序上大概是这样一个情况:
condition variable(条件变量)
mutex虽然可以加锁进行同步,但不能用来等待。
如果需要等待某个条件成立才继续执行,需要使用condition variable。学名叫管程(monitor)。
条件变量只有一种正确使用方式。
wait 端:
- 与mutex一起使用
- 在mutex上锁时才能调用
- 把判断bool条件放到while循环中(避免虚假唤醒(spurious wakeup))
signal/broadcast端:
- 不一定要在mutex已上锁时调用signal
- 在signal之前一般要修改bool表达式
- 修改bool表达式时通常要mutex保护
- 注意区分signal与broadcast
// Wait 端一个代码示例 muduo::MutexLock mutex; muduo::Condition cond(mutex); std::deque queue; int deque() { //与mutex一起使用,上锁时才能调用 MutexLockGuard lock(mutex); // "queue.empty()" 是这里的bool表达式 while(queue.empty()) { cond.wait();//并放到while循环中 //唤醒后可能仍不满足条件所以用while } assert(!queue.empty()); int top=queue.front(); queue.pop_front(); return top; } void enqueue(int x) { MutexLockGuard lock(mutex);//修改表达式时通常要mutex保护 queue.push_back(x); //此处已经将bool表达式修改完毕,可以发送signal cond.notify();//发送signal,理论上此处调用时不需要mutex上锁,可以移出临界区之外 //可能broadcast指的是notifyAll ?(不确定) //notify只通知一个,且随机决定;notifyAll通知所有。 }
condition variable 是十分底层的同步原语,可以用它实现高层的一些同步措施。
如倒计时(CountDownLatch), 线程池(ThreadPool),队列等。
mutex和condition variable不能互相替代,它们共同构成了多线程编程的全部必备同步原语。
读写锁、信号量
不是必备的,而且开销方面一般更大,更容易用错,所以先不要用就行了。[捂脸]
muduo的 MutexLock MutexLockGuard Condition的封装
将POSIX的接口封装为muduo的类。
有一些代码上的技巧和设计思路比如:
- 禁止拷贝构造函数和赋值。
- 定义一个宏防止用户遗漏变量名
- 不需要随意增加灵活性,仅提供最常用最基本的功能
- 封装pthread库可以使一些工具能正常得到happeds-before关系,分析程序有无data race
#define MutexLockGuard(x) static_assert(false,"missing mutex guard var name.") void func() { MutexLockGuard(mutex);//本来会产生一个临时对象,然后马上销毁;在定义宏之后这句话会直接报错。 }
由于mutex和condition variable很基础,可能一些使用情景不能直接去使用它们,而是使用高层的措施。
singleton的线程安全
Singleton线程安全在Java里还行,c++里好像曾经很混乱。
直接使用pthread_once就可以保证了。
c++11可能对应的是 call_once 和 once_flag
这里从书上抄一个单例模式的示例:
template class Singleton { public: static T& instance() { pthread_once(&ponce_, &Singleton::init); return *value_; } Singleton(const Singleton&)=delete; void operator=(Singleton const&) = delete; private: Singleton(); ~Singleton(); static void init() { value = new T(); } static pthread_once_t ponce_; static T* value_; }; template pthread_once_t Singleton::ponce_ = PTHREAD_ONCE_INIT; template T* Singleton::value_ = nullptr; ... //使用 Foo& foo = Singleton::instance();
这里的实现没有考虑自动销毁对象的问题,一般单例模式最对只会产生一个对象,并会一直存在直到程序退出,由系统进行资源回收。
如果一定要销毁对象,可以使用atexit(my_function)手动指定销毁方式。
sleep(3)不能保证同步
一般用于构建一些测试情况时使用,sleep本身只负责延时,一般只能出现在测试代码中。否则程序的设计一般是有问题的。
总结
线程同步的四项原则:
- 最低限度共享对象,减少需要同步的场合。(类似Demeter法则?)
- 使用高级的并发编程构件。
- 若不得已使用底层同步原语,只是用 non-recursive mutex 和 condition variable,其他如读写锁、信号量最好不要用。使用时利用RAII和Scoped Locking保证结构清晰。
- 除了使用atomic整数意外不去自己编写lock-free代码,也不要用内核级同步原语。
一般依照这些原则就可以应付多线程开发的各种场合了。
"让一个正确的程序变快"远比"让一个快的程序变正确"容易得多。所以首先保证程序的清晰、简单和正确性,再去考虑性能优化。
在这一章内,主要提倡正确加锁,而不是效率问题。妄谈哪种做法效率更高是靠不住的。
利用 shared_ptr 实现 copy-on-write
MutexLock mutex; shared_ptr> g_foos; void write() { MutexLockGuard lock(mutex); if(!g_foos.unique()) { //当前g_foos观察者多于1位,说明正在读取。 g_foos.reset(new vector(*g_foos)); //此时写入到一个新的内存,并将shared_ptr指向新的位置。旧的内存地址仍存在于read端一个shared_ptr内,所以在旧地址读取结束后会自动释放内存。 } //不管如何,当前的g_foos一定是可以进行写入的。 g_foos->push_back(f); } void read() { shared_ptr> foos; { MutexLockGuard lock(mutex); foos=g_foos; //此处使g_foos引用计数+1,此后可以解锁mutex,因为write段会知道正在读取。 } //此时进行访问,其他位置只需要保证不修改foos当前指向的内存即可,所以此处可以不加锁。 for(auto&m : foos) m->print(); }
前面提到的"Inventory和Request"例子中,Request对象析构的race condition没有解决。可以用
enable_shared_from_this解决,同样也是利用了shared_ptr。