信号量、锁模块
1 引入
加锁的目的是保证共享资源在任意时间里,只有一个线程访问,这样就可以避免多线程导致共享数据错乱的问题。
1.1 Linux的4种锁机制:
互斥锁:mutex,用于保证在任何时刻,都只能有一个线程访问该对象。当获取锁操作失败时,线程会进入睡眠,等待锁释放时被唤醒
读写锁:rwlock,分为读锁和写锁。处于读操作时,可以允许多个线程同时获得读操作。但是同一时刻只能有一个线程可以获得写锁。其它获取写锁失败的线程都会进入睡眠状态,直到写锁释放时被唤醒。 注意:写锁会阻塞其它读写锁。当有一个线程获得写锁在写时,读锁也不能被其它线程获取;写者优先于读者(一旦有写者,则后续读者必须等待,唤醒时优先考虑写者)。适用于读取数据的频率远远大于写数据的频率的场合。
自旋锁:spinlock,在任何时刻同样只能有一个线程访问对象。但是当获取锁操作失败时,不会进入睡眠,而是会在原地自旋,直到锁被释放。这样节省了线程从睡眠状态到被唤醒期间的消耗,在加锁时间短暂的环境下会极大的提高效率。但如果加锁时间过长,则会非常浪费CPU资源。
RCU:即read-copy-update,在修改数据时,首先需要读取数据,然后生成一个副本,对副本进行修改。修改完成后,再将老数据update成新的数据。使用RCU时,读者几乎不需要同步开销,既不需要获得锁,也不使用原子指令,不会导致锁竞争,因此就不用考虑死锁问题了。而对于写者的同步开销较大,它需要复制被修改的数据,还必须使用锁机制同步并行其它写者的修改操作。在有大量读操作,少量写操作的情况下效率非常高。
互斥锁和读写锁的区别:
1)读写锁区分读者和写者,而互斥锁不区分
2)互斥锁同一时间只允许一个线程访问该对象,无论读写;读写锁同一时间内只允许一个写者,但是允许多个读者同时读对象。
1.2 区域锁思想
区域锁(Scoped Locking)并非一种锁的种类,而是使用锁的一种模式。这种概念是C++中RAII(Resource Acquisition Is Initialization)模式具体体现之一,"资源需要即初始化"。基本思想:C++中的资源应该交由对象管理,资源在对象的构造函数中完成初始化,在对象的析构函数中被释放。
本项目采用的就是区域锁的思想,需要加锁时,初始化一个锁的对象来保护数据,结束后析构掉。在构造函数中完成对锁的初始化,在析构函数中完成对锁的资源释放。
1.3 实现思路
通过局域锁的思想实现加锁解锁。设计各种锁的类LockType,抽象如下面的代码所示,包含
- 一个LockType类型的成员POSIX信号量
- 构造析构函数 -- 构造函数初始化信号量,析构函数析构信号量
- 加锁解锁函数 -- 实现信号量的加锁解锁
- 一个类型定义 -- 利用类模板初始化一个T为LockType的成员,构造析构实现加锁解锁
class LockType : Noncopyable { public: typedef ScopedLockImpl<Mutex> Lock; //局部锁 Mutex() { pthread_LockType_init(&m_mutex, nullptr); } ~Mutex() { pthread_LockType_destroy(&m_mutex); } void lock() { pthread_LockType_lock(&m_mutex); } void unlock() { pthread_LockType_unlock(&m_mutex); } private: pthread_LockType_t m_mutex; };
//互斥量一般都是在一个局部范围
//为了防止漏掉解锁,一般写一个类,通过构造函数加锁,析构函数解锁
2 信号量
信号量是一种特殊的变量,对其操作访问都是源原子操作。且只允许等待wait和发送post操作。
2.1 POSIX信号量API
POSIX 信号量,定义在头文件<semaphore.h>, 底层是一个非负整数,通过原子操作对其加减,控制线程(进程)对共享资源的访问
int sem_init(sem_t *sem, int pshared, unsigned int value); 初始化-----构造 int sem_wait(sem_t *sem); 减1操作-------wait int sem_post(sem_t *sem); 加1操作-------notify int sem_destroy(sem_t *sem); 释放-------析构
2.2 信号量
class Semaphore : Noncopyable { public: Semaphore(uint32_t count = 0); //构造函数 count 信号量值的大小 ~Semaphore(); void wait(); //获取信号量,数-1 void notify(); //释放信号量,数+1 private: sem_t m_semaphore; }; Semaphore::Semaphore(uint32_t count) { if(sem_init(&m_semaphore, 0, count)) { throw std::logic_error("sem_init error"); } } Semaphore::~Semaphore() { sem_destroy(&m_semaphore); } void Semaphore::wait() { if(sem_wait(&m_semaphore)) { throw std::logic_error("sem_wait error"); } } void Semaphore::notify() { if(sem_post(&m_semaphore)) { throw std::logic_error("sem_post error"); } }
2.3 协程信号量
class Scheduler; class FiberSemaphore : Noncopyable { public: typedef Spinlock MutexType; FiberSemaphore(size_t initial_concurrency = 0); ~FiberSemaphore(); bool tryWait(); void wait(); void notify(); size_t getConcurrency() const { return m_concurrency;} void reset() { m_concurrency = 0;} private: MutexType m_mutex; std::list<std::pair<Scheduler*, Fiber::ptr> > m_waiters; size_t m_concurrency; };
3 互斥锁
//局部区域互斥锁、自旋锁、原子锁模板类 //T:锁的类型 template<class T> struct ScopedLockImpl { public: ScopedLockImpl(T& mutex) :m_mutex(mutex) { m_mutex.lock(); //构造时加锁 m_locked = true; } ~ScopedLockImpl() { unlock(); //自动释放锁 } void lock() { if(!m_locked) { m_mutex.lock(); m_locked = true; } } void unlock() { if(m_locked) { m_mutex.unlock(); m_locked = false; } } private: T& m_mutex; //mutex bool m_locked; //是否已上锁 }; //互斥锁类 class Mutex : Noncopyable { public: typedef ScopedLockImpl<Mutex> Lock; //局部锁 Mutex() { pthread_mutex_init(&m_mutex, nullptr); } ~Mutex() { pthread_mutex_destroy(&m_mutex); } void lock() { pthread_mutex_lock(&m_mutex); } void unlock() { pthread_mutex_unlock(&m_mutex); } private: pthread_mutex_t m_mutex; };
4 读写锁
//局部读锁模板实现 template<class T> struct ReadScopedLockImpl { public: ReadScopedLockImpl(T& mutex) :m_mutex(mutex) { m_mutex.rdlock(); m_locked = true; } ~ReadScopedLockImpl() { unlock(); //自动释放锁 } void lock() { if(!m_locked) { m_mutex.rdlock(); m_locked = true; } } void unlock() { if(m_locked) { m_mutex.unlock(); m_locked = false; } } private: T& m_mutex; //mutex bool m_locked; //是否已上锁 }; //局部写锁模板实现 template<class T> struct WriteScopedLockImpl { public: WriteScopedLockImpl(T& mutex) :m_mutex(mutex) { m_mutex.wrlock(); m_locked = true; } ~WriteScopedLockImpl() { unlock(); } void lock() { if(!m_locked) { m_mutex.wrlock(); m_locked = true; } } void unlock() { if(m_locked) { m_mutex.unlock(); m_locked = false; } } private: T& m_mutex; //Mutex bool m_locked; //是否已上锁 }; //读写互斥量 class RWMutex : Noncopyable{ public: typedef ReadScopedLockImpl<RWMutex> ReadLock; //局部读锁 typedef WriteScopedLockImpl<RWMutex> WriteLock; //局部写锁 RWMutex() { pthread_rwlock_init(&m_lock, nullptr); } ~RWMutex() { pthread_rwlock_destroy(&m_lock); } void rdlock() { pthread_rwlock_rdlock(&m_lock); } void wrlock() { pthread_rwlock_wrlock(&m_lock); } void unlock() { pthread_rwlock_unlock(&m_lock); } private: pthread_rwlock_t m_lock; //读写锁 };
5 自旋锁
class Spinlock : Noncopyable { public: typedef ScopedLockImpl<Spinlock> Lock; //局部锁 Spinlock() { pthread_spin_init(&m_mutex, 0); } ~Spinlock() { pthread_spin_destroy(&m_mutex); } void lock() { pthread_spin_lock(&m_mutex); } void unlock() { pthread_spin_unlock(&m_mutex); } private: //自旋锁 pthread_spinlock_t m_mutex; };
6 原子锁
class CASLock : Noncopyable { public: typedef ScopedLockImpl<CASLock> Lock; //局部锁 CASLock() { m_mutex.clear(); } ~CASLock() { } void lock() { //执行本次原子操作之前 所有 读 原子操作必须全部完成 while(std::atomic_flag_test_and_set_explicit(&m_mutex, std::memory_order_acquire)); } void unlock() { //执行本次原子操作之前 所有 写 原子操作必须全部完成 std::atomic_flag_clear_explicit(&m_mutex, std::memory_order_release); } private: //原子状态 volatile std::atomic_flag m_mutex; };