《...多线程...使用muduo C++网络库》第二章笔记

《Linux多线程服务端编程:使用muduo+C++网络库》第二章笔记

标题有字数限制,所以没有写完整书名。

mutex(互斥器)

mutex是最常用的同步原语。mutex可以保护一个临界区,使得任何时刻最多仅有一个线程在临界区内活动。

mutex根据POSIX中的实现可以分为递归(recursive)和非递归(non-recursive)两种,主要区别为同一个线程可以重复对recursive mutex加锁,而对non-recursive mutex则不可以。

书中提到使用上的原则大概是:

  • RAII封装mutex的生存及其相关操作。
  • 使用Guard管理mutex锁,并利用生存期控制Guard,基于此结构(Scoped Locking)可以使debug很方便。
  • 构造Guard时,防止加锁顺序不同导致死锁。
  • 使用非递归的mutex。
  • 跨进程时使用tcp sockets。
  • 尽量使用高层同步方法。

递归互斥器的一些问题

显然recursive mutex可以避免一个线程将自己锁死,但如果使用不当会带来较大的问题。

MutexLock mutex;
std::vector foos;
// write
void post(const Foo& f){
    MutexLockGuard lock(mutex);
    foos.push_back(f);
}
// read
void traverse()
{
    MutexLockGuard lock(mutex);
    for(auto& m: foos)
        m.doit();
}

上述两个函数中,post()加锁并修改了foos内容;traverse()加锁并便利了foos内容。他们在单独执行时都是没问题的。
而如果Foo::doit()需要调用post(),就会出现以下问题:

  1. recursive mutex,post()不会被阻塞,由此导致foos内容改变,可能会出现空间分配导致的迭代器失效,使得程序偶尔崩溃;
  2. non-recursive mutex,post()运行时发现mutex已经被锁住,于是等待,导致死锁。

在调试一个多线程bug时,分析一个问题出现的原因是很麻烦的,而如果是依照使用原则而出现了死锁,直接查看back trace就可以很快定位问题。

thread apply all bt

上述例子实际上是设计上不够合适导致的,要使其正常工作需要调整工作方式,例如copy-on-write等。

两个线程死锁的例子

上一节提到了一个线程将自己锁死的例子,下面给出一个race condition导致死锁的例子。

class Inventory
{
    public:
     void add(Request* req)
     {
         muduo::MutexLockGuard lock(mutex_);
         requests_.insert(req);
     }
     void remove(Request* req)
     {
         muduo::MutexLockGuard lock(mutex_);
         requests_.erase(req);
     }
     void printAll() const
     {
         muduo::MutexLockGuard lock(mutex_);
         sleep(1);
         for(auto&m:requests_)
            m->print();
         cout<<"Inventory::printAll() unlocked"<<endl;
     }
    private:
     mutable muduo::MutexLock mutex_;
     set requests_;
};

Inventory 是一个管理Request的类,提供"add"和"remove"两个线程安全的方法。

class Request
{
    public:
     void process(Inventory& inventory)
     {
         muduo::MutexLockGuard lock(mutex_);
         recent_inventory = &invertory;
         recent_inventory->add(this);
     }
     ~Request()
     {
         muduo::MutexLockGuard lock(mutex_);
         sleep(1);
         recent_inventory->remove(this);
     }
     void print() const
     {
         muduo::MutexLockGuard lock(mutex_);
         // todo: 打印请求内容
     }
    private:
     Inventory* recent_inventory;
     mutable muduo::MutexLock mutex_;
};

Request是一个请求相关的类。

void threadFunc(Inventory& inventory)
{
    Request* req = new Request;
    req->process(inventory);
    delete req;
}
int main()
{
    Inventory inventory;
    muduo::Thread t(threadFunc, std::ref(inventory));
    t.start();
    usleep(500 * 1000);
    inventory.printAll();
    t.join();
    return 0;
}

visual studio中可以在运行后使用"全部中断"按钮来打断,于是可以看到两个线程死锁的情况

vs按钮

线程死锁情况

借用书中的图,时序上大概是这样一个情况:

condition variable(条件变量)

mutex虽然可以加锁进行同步,但不能用来等待。

如果需要等待某个条件成立才继续执行,需要使用condition variable。学名叫管程(monitor)。

条件变量只有一种正确使用方式。

wait 端:

  • 与mutex一起使用
  • 在mutex上锁时才能调用
  • 把判断bool条件放到while循环中(避免虚假唤醒(spurious wakeup))

signal/broadcast端:

  • 不一定要在mutex已上锁时调用signal
  • 在signal之前一般要修改bool表达式
  • 修改bool表达式时通常要mutex保护
  • 注意区分signal与broadcast
// Wait 端一个代码示例
muduo::MutexLock mutex;
muduo::Condition cond(mutex);
std::deque queue;
int deque()
{
    //与mutex一起使用,上锁时才能调用
    MutexLockGuard lock(mutex);
    // "queue.empty()" 是这里的bool表达式
    while(queue.empty())
    {
        cond.wait();//并放到while循环中
        //唤醒后可能仍不满足条件所以用while
    }
    assert(!queue.empty());
    int top=queue.front();
    queue.pop_front();
    return top;
}
void enqueue(int x)
{
    MutexLockGuard lock(mutex);//修改表达式时通常要mutex保护
    queue.push_back(x);
    //此处已经将bool表达式修改完毕,可以发送signal
    cond.notify();//发送signal,理论上此处调用时不需要mutex上锁,可以移出临界区之外
    //可能broadcast指的是notifyAll ?(不确定)
    //notify只通知一个,且随机决定;notifyAll通知所有。
}

condition variable 是十分底层的同步原语,可以用它实现高层的一些同步措施。
如倒计时(CountDownLatch), 线程池(ThreadPool),队列等。

mutex和condition variable不能互相替代,它们共同构成了多线程编程的全部必备同步原语。

读写锁、信号量

不是必备的,而且开销方面一般更大,更容易用错,所以先不要用就行了。[捂脸]

muduo的 MutexLock MutexLockGuard Condition的封装

将POSIX的接口封装为muduo的类。
有一些代码上的技巧和设计思路比如:

  • 禁止拷贝构造函数和赋值。
  • 定义一个宏防止用户遗漏变量名
  • 不需要随意增加灵活性,仅提供最常用最基本的功能
  • 封装pthread库可以使一些工具能正常得到happeds-before关系,分析程序有无data race
#define MutexLockGuard(x) static_assert(false,"missing mutex guard var name.")
void func()
{
    MutexLockGuard(mutex);//本来会产生一个临时对象,然后马上销毁;在定义宏之后这句话会直接报错。
}

由于mutex和condition variable很基础,可能一些使用情景不能直接去使用它们,而是使用高层的措施。

singleton的线程安全

Singleton线程安全在Java里还行,c++里好像曾经很混乱。

直接使用pthread_once就可以保证了。

c++11可能对应的是 call_once 和 once_flag

这里从书上抄一个单例模式的示例:

template
class Singleton
{
    public:
     static T& instance()
     {
         pthread_once(&ponce_, &Singleton::init);
         return *value_;
     }
     Singleton(const Singleton&)=delete;
     void operator=(Singleton const&) = delete;
    private:
     Singleton();
     ~Singleton();
     static void init()
     {
         value = new T();
     }
     static pthread_once_t ponce_;
     static T* value_;
};
template 
pthread_once_t Singleton::ponce_ = PTHREAD_ONCE_INIT;
template 
T* Singleton::value_ = nullptr;
...
//使用
Foo& foo = Singleton::instance();

这里的实现没有考虑自动销毁对象的问题,一般单例模式最对只会产生一个对象,并会一直存在直到程序退出,由系统进行资源回收。

如果一定要销毁对象,可以使用atexit(my_function)手动指定销毁方式。

sleep(3)不能保证同步

一般用于构建一些测试情况时使用,sleep本身只负责延时,一般只能出现在测试代码中。否则程序的设计一般是有问题的。

总结

线程同步的四项原则:

  • 最低限度共享对象,减少需要同步的场合。(类似Demeter法则?)
  • 使用高级的并发编程构件。
  • 若不得已使用底层同步原语,只是用 non-recursive mutex 和 condition variable,其他如读写锁、信号量最好不要用。使用时利用RAII和Scoped Locking保证结构清晰。
  • 除了使用atomic整数意外不去自己编写lock-free代码,也不要用内核级同步原语。

一般依照这些原则就可以应付多线程开发的各种场合了。

"让一个正确的程序变快"远比"让一个快的程序变正确"容易得多。所以首先保证程序的清晰、简单和正确性,再去考虑性能优化。

在这一章内,主要提倡正确加锁,而不是效率问题。妄谈哪种做法效率更高是靠不住的。

利用 shared_ptr 实现 copy-on-write

MutexLock mutex;
shared_ptr> g_foos;
void write()
{
    MutexLockGuard lock(mutex);
    if(!g_foos.unique())
    {
        //当前g_foos观察者多于1位,说明正在读取。
        g_foos.reset(new vector(*g_foos));
        //此时写入到一个新的内存,并将shared_ptr指向新的位置。旧的内存地址仍存在于read端一个shared_ptr内,所以在旧地址读取结束后会自动释放内存。
    }
    //不管如何,当前的g_foos一定是可以进行写入的。
    g_foos->push_back(f);
}
void read()
{
    shared_ptr> foos;
    {
        MutexLockGuard lock(mutex);
        foos=g_foos;
        //此处使g_foos引用计数+1,此后可以解锁mutex,因为write段会知道正在读取。
    } 
    //此时进行访问,其他位置只需要保证不修改foos当前指向的内存即可,所以此处可以不加锁。
    for(auto&m : foos)
        m->print();
}

前面提到的"Inventory和Request"例子中,Request对象析构的race condition没有解决。可以用
enable_shared_from_this解决,同样也是利用了shared_ptr。

参考资料:

#C/C++##读书笔记##Linux##笔记#
全部评论

相关推荐

不愿透露姓名的神秘牛友
11-21 17:16
科大讯飞 算法工程师 28.0k*14.0, 百分之三十是绩效,惯例只发0.9
点赞 评论 收藏
分享
3 17 评论
分享
牛客网
牛客企业服务