【Linux】线程安全
线程安全
**线程之间对临界资源的安全访问**
因为在cpu足够的情况下,多个线程的运行可能是并行的,一次对邻接资源的访问,就可能
造成争抢操作,会造成数据的二义性问题;
因此线程安全就是讨论如何保证线程对临界资源的安全访问;
使用同步与互斥解决!!
同步: 对临界资源的访问的可控性;
互斥: 对临界资源同时间的唯一访问
互斥 :互斥锁 ----必须是原子操作不可被打断
一个只有0/1的计数器
1-有资源能操作
0-没资源则(阻塞/报错返回)
加锁时,计数器不为0 则 减1;为0则阻塞/报错返回
解锁时,计数置1
通过互斥锁来保证临界资源的安全访问;
互斥锁也是一个临界资源,因此互斥锁本身就必须是原子操作。
死锁:线程/进程因为一直获取不到锁资源,导致阻塞卡死,一般讨论同时出现多个锁造成死锁的情况。
死锁产生的四个必要条件:
1.互斥条件 ---唯一时间只能一人操作
2.不可剥脱条件 ---谁的锁只能解
3.请求与保持 ---拿第一个取第二个,拿不到第二个,但也不放手第一个
4.环路等待条件
避免死锁:破坏必要条件
避免死锁:银行家算法,死锁检测算法
[银行家算法](https://www.cnblogs.com/chuxiuhong/p/6103928.html)
(加载-》处理-》换回)
同步 :条件变量
pthread_cond_wait
集合解锁,休眠,加锁的原子操作
因为加锁是非阻塞的,所有条件判断应该是循环的
不同角色的线程应该等待在不同的条件变量上,防止错误唤醒导致卡死
多个商人和多个消费者的时候---消费者消费结束应该唤醒商人,但是实际情况唤醒的不一定是卖面的,因为此时会出问题,所以需要分开等待分开唤醒
生产者与消费者模型:data:image/s3,"s3://crabby-images/2ca92/2ca92e31779b627b1aa77b1d624f0c2bed4393ff" alt=""
生产者抓取数据,将数据放到缓冲区,消费者从缓冲区拿到数据并处理
支持忙闲不均; 解耦和; 支持并发;
缓冲区需要考虑线程安全问题!
一个场所,两类角色,三种关系
生产者与生产者:互斥
消费者与消费者:互斥
生产者与消费者:同步+互斥
c++中 std::queue 不是线程安全的(解决线程安全问题需要耗费更多时间)
信号量:POSIX标准信号量
具有等待队列的计数器(统计资源数,判断现在是否有资源)
实现进程/线程间的同步与互斥(不同于互斥锁的只要01两种计数)
互斥锁的计数 + 条件变量里的等待和唤醒 ~= 信号量
POSIX标准信号量:
计数器是一个资源计数
获取资源
计数大于0,表示有资源可供操作,计数-1
计数不大于0,表示没有资源可供操作,陷入休眠
释放资源
计数+1,并且唤醒等待在队列上的pcb
sem_init
sem_destroy
sem_wait : 计数不大于0,表示没有资源可供操作,陷入休眠
sem_timewait: 限定时间内计数不大于0,没有资源可供操作,陷入休眠
sem_post: 唤醒等待队列上的pcb
使用c++中的vector实现线程安全的环形队列
sem.cpp
/* 基于C++中的vector容器实现线程安全的环形队列
* vector---一个线性表(数组)
* */
#include <iostream>
#include <vector>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h>
class RingQueue
{
private:
std::vector<int> _list;
int _cap;
pthread_mutex_t _mutex;
sem_t _space;
sem_t _data;
int _pro_step;
int _con_step;
public:
RingQueue(int cap = 10):_cap(cap),_list(cap),_pro_step(0),_con_step(0)
{
sem_init(&_space,0,cap);
sem_init(&_data,0,0);
pthread_mutex_init(&_mutex,NULL);
}
~RingQueue()
{
sem_destroy(&_space);
sem_destroy(&_data);
}
bool QueuePush(int data)
{
sem_wait(&_space);
pthread_mutex_lock(&_mutex);
_list[_pro_step] = data;
_pro_step++;
_pro_step %= _cap;
pthread_mutex_unlock(&_mutex);
sem_post(&_data);
return true;
}
bool QueuePop(int *data)
{
sem_wait(&_data);
pthread_mutex_lock(&_mutex);
*data = _list[_con_step];
_con_step++;
_con_step %= _cap;
pthread_mutex_unlock(&_mutex);
sem_post(&_space);
return true;
}
};
void* thr_con(void *arg)
{
RingQueue *q = (RingQueue*)arg;
while(1)
{
usleep(1000);
int data;
q->QueuePop(&data);
printf("consumer get data:%d\n",data);
}
}
void* thr_pro(void *arg)
{
RingQueue *q = (RingQueue*)arg;
int i = 0;
while(1)
{
printf("productor put data:%d\n",i);
q->QueuePush(i++);
}
}
int main()
{
pthread_t ctid[4],ptid[4];
int i , ret;
RingQueue q;
for(i = 0; i < 4; i++)
{
ret = pthread_create(&ctid[i],NULL,thr_con,(void*)&q);
if(ret != 0)
{
printf("create thread error\n");
return -1;
}
ret = pthread_create(&ptid[i],NULL,thr_pro,(void*)&q);
if(ret != 0)
{
printf("create thread error\n");
return -1;
}
}
for(i = 0; i < 4; i++)
{
pthread _join(ctid[i],NULL);
pthread_join(ptid[i],NULL);
}
}
读写锁:
写互斥,读共享 write 写的时候判断写计数和读计数,若都为0则可以写操作,否则 阻塞 read 读的时候只需要判断写计数,若为0,则可以读,否则 阻塞 读写锁使用自旋锁实现的? 自旋锁循环对资源进行判断cpu消耗大---自旋锁的使用场景 场景: 操作时间短,并且读操作多,写操作少 读写锁默认是读优先的,因此当读者多写者少时,设置默认为写优先
线程池 一个或多个线程 + 任务队列
启动线程任务请求,
1.若同一时间因为大量请求而创建大量线程,有可能导致资源耗尽,程序崩溃(需要限制上限)
2.t1(创建线程) + t2(处理任务) + t3(销毁任务) = t
若t1 + t3 / t 比例较大,这意味着创建/销毁线程占据了大量时间,对于程序效率来说,是不可取的。
功能:
1.避免峰值压力,导致资源耗尽
2.避免大量的线程创建/销毁成本
实现:
#include<iostream>
#include<queue>
#include<stdio.h>
#include<unistd.h>
#include<stdlib.h>
#include<pthread.h>
#include<time.h>
class MyTask
{
private:
int _data;
public:
MyTask(int data):_data(data){}
void SetData(int data){_data = data;}
void Run()
{
srand(time(NULL));
int s = rand() % 5;
printf("thread %p get data %d sleep -- %d\n",pthread_self(),_data,s);
sleep(s);
}
};
class ThreadPool
{
private:
std::queue<MyTask *> _list;
int _cap;
pthread_mutex_t _mutex;
pthread_cond_t _full;
pthread_cond_t _empty;
int _max_thr;
bool QueueIsEmpty()
{
return (_list.size() == 0);
}
bool QueueIsFull()
{
return (_list.size() == _cap);
}
bool QueuePush(MyTask *t)
{
_list.push(t);
}
void QueuePop(MyTask **t)
{
*t = _list.front();
_list.pop();
}
static void *thr_start(void *arg)
{
while(1)
{
ThreadPool *p = (ThreadPool*)arg;
pthread_mutex_lock(&p->_mutex);
MyTask *task;
while(p->QueueIsEmpty())
{
//没有任务,则工作线程陷入等待
pthread_cond_wait(&p->_empty,&p->_mutex);
}
p->QueuePop(&task);
pthread_mutex_unlock(&p->_mutex);
pthread_cond_signal(&p->_full);
task->Run();
delete task;
}
return NULL;
}
public:
ThreadPool(int thr = 5,int q = 10)
:_max_thr(thr), _cap(q)
{
pthread_mutex_init(&_mutex,NULL);
pthread_cond_init(&_full,NULL);
pthread_cond_init(&_empty,NULL);
}
~ThreadPool()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_full);
pthread_cond_destroy(&_empty);
}
bool Init()
{
int ret,i;
pthread_t tid;
for(i = 0; i < _max_thr; i++)
{
ret = pthread_create(&tid,NULL,thr_start,(void*)this);
if(ret != 0)
{
printf("create thread error\n");
return false;
}
}
pthread_detach(tid);
return true;
}
bool AddTask(MyTask *task)
{
pthread_mutex_lock(&_mutex);
while(QueueIsFull())
{
pthread_cond_wait(&_full,&_mutex);
}
QueuePush(task);
pthread_mutex_unlock(&_mutex);
pthread_cond_signal(&_empty);
return true;
}
};
int main()
{
ThreadPool p;
p.Init();
MyTask *t;
int i = 0;
while(1)
{
printf("add task data:%d\n",i);
t = new MyTask(i++);
p.AddTask(t);
}
return 0;
}
线程安全的单例模式:
单例模式:典型设计模式的一种,指一个对象只能被初始化一次
单例模式: 饿汉/懒汉模式
饿汉:启动阶段一次性初始化完毕,用户体验不好,因为程序初始化时间过长,但是后续运行流畅
懒汉:将初始化分摊到各个阶段,使用的时候再初始化,启动节点用户体验比较好,但是第一次运行到某个模块的时候,流畅度不太好。
class obj
{
static T* data; //饿汉模式
obj() //懒汉模式
{
mutex.lock();
if(data == NULL)
{
data = new T();
}
mutex.unlock();
}
}