使用同步信号量和互斥信号量解决生产者和消费者问题
生产者和消费者问题
生产者和消费者问题是一个经典的进程同步问题。在这个问题中,生产者不断地向缓冲区中写入数据,而消费者则从缓冲区中读取数据。生产者进程和消费者进程对缓冲区的操作是互斥的,即任意时刻只能有一个进程对这个缓冲区进行操作。同时,生产者进程在进入缓冲区之前,先要检查缓冲区是否已满,如果缓冲区已满,则必须等待消费者进程将数据取出才能写入数据。同样地,消费者进程在从缓冲区读取数据之前,也要判断缓冲区是否为空,如果为空,则必须等待生产者进程写入数据才能读取数据。因此,生产者进程和消费者进程之间存在进程同步现象。
什么是同步信号量和互斥信号量
在多线程编程中,同步信号量(synchronization semaphore)和互斥信号量(mutex semaphore)是两种常用的同步机制。它们都是通过操作信号量来实现线程之间的同步和互斥访问共享资源。
- 同步信号量(Synchronization Semaphore): 同步信号量用于实现线程之间的同步,确保线程按照特定的顺序执行。它是一种计数信号量,初始值通常为0,当线程需要等待某个条件满足时,会调用等待(wait)操作,如果条件不满足,线程将被阻塞,直到其他线程通过发信号(signal)操作将信号量的计数值增加,使得条件满足,阻塞的线程被唤醒并继续执行。同步信号量常用于解决生产者-消费者问题、线程的顺序执行等场景。
- 互斥信号量(Mutex Semaphore): 互斥信号量用于实现线程之间的互斥,确保同一时间只有一个线程能够访问共享资源。它是一种二进制信号量,初始值通常为1,当线程需要访问共享资源时,会调用加锁(lock)操作,如果互斥信号量的计数值为1,则线程可以继续执行临界区代码,并将互斥信号量的计数值减1,表示锁定资源。如果互斥信号量的计数值为0,表示资源已被其他线程锁定,当前线程将被阻塞,直到资源解锁,即其他线程释放锁,互斥信号量的计数值变为1。互斥信号量常用于解决竞争条件和避免多线程访问共享资源的冲突。
实现思路
在开始实验之前,我们需要分析计算机系统中对资源的分配与释放过程。在计算机系统中,每个进程都可以消费或生产某类资源。当系统中的某个进程使用某一资源时,可以视为消耗,将该进程称为消费者;而当某个进程释放资源时,则它就相当于一个生产者。
基于以上思路,我们将进行以下步骤:
- 定义生产者与消费者问题中的各种数据结构,并初始化信号量。
- 创建生产者和消费者进程,利用信号量实现生产者与消费者之间的同步与互斥。
实现步骤
1. 定义数据结构和信号量
首先,我们需要定义生产者与消费者问题中的数据结构和信号量。在本例中,我们使用数组作为缓冲区,并定义一个信号量用于控制缓冲区的访问。
#define BUFFER_SIZE 5 // 缓冲区大小
int buffer[BUFFER_SIZE]; // 缓冲区数组
HANDLE semaphore; // 信号量句柄
2. 初始化信号量
接下来,我们需要初始化信号量。在本例中,我们将信号量的初始值设置为缓冲区的大小,表示缓冲区为空。
semaphore = CreateSemaphore(NULL, BUFFER_SIZE, BUFFER_SIZE, NULL);
3. 创建生产者和消费者进程
我们使用CreateThread()
函数创建生产者和消费者进程,并指定相应的线程函数。
HANDLE producerThread = CreateThread(NULL, 0, producerFunction, NULL, 0, NULL);
HANDLE consumerThread = CreateThread(NULL, 0, consumerFunction, NULL, 0, NULL);
4. 实现生产者函数
生产者函数用于模拟生产者不断向缓冲区中写入数据的过程。在写入数据之前,生产者需要检查缓冲区是否已满,如果已满,则需要等待消费者将数据取出。
DWORD WINAPI producerFunction(LPVOID lpParam) {
int item = 0; // 要生产的数据项
while (true) {
// 检查缓冲区是否已满
WaitForSingleObject(semaphore, INFINITE);
// 将数据写入缓冲区
buffer[in] = item;
printf("Produced item: %d\n", item);
in = (in + 1) % BUFFER_SIZE;
item++;
// 释放信号量,表示生产者已完成写入
ReleaseSemaphore(semaphore, 1, NULL);
}
return 0;
}
5. 实现消费者函数
消费者函数用于模拟消费者从缓冲区中读取数据的过程。在读取数据之前,消费者需要检查缓冲区是否为空,如果为空,则需要等待生产者将数据写入。
DWORD WINAPI consumerFunction(LPVOID lpParam) {
int item;
while (true) {
// 检查缓冲区是否为空
WaitForSingleObject(semaphore, INFINITE);
// 从缓冲区中读取数据
item = buffer[out];
printf("Consumed item: %d\n", item);
out = (out + 1) % BUFFER_SIZE;
// 释放信号量,表示消费者已完成读取
ReleaseSemaphore(semaphore, 1, NULL);
}
return 0;
}
6. 等待线程结束
最后,我们使用WaitForSingleObject()
函数等待生产者和消费者线程结束,并释放相关资源。
WaitForSingleObject(producerThread, INFINITE);
WaitForSingleObject(consumerThread, INFINITE);
CloseHandle(producerThread);
CloseHandle(consumerThread);
CloseHandle(semaphore);
这样,我们就完成了使用信号量实现生产者与消费者问题的代码实现。在该实现中,生产者和消费者通过信号量进行同步和互斥,确保了生产者不会向满的缓冲区写入数据,消费者不会从空的缓冲区读取数据。
完整代码
#include<windows.h>
#include<stdio.h>
const int SIZE_OF_BUFFER = 5;
int ProductID = 0;
int ConsumeID = 0;
int in = 0;
int out = 0;
int g_buffer[SIZE_OF_BUFFER];
bool g_continue = true;
HANDLE g_hMutex;
HANDLE g_hFullSemaphore;
HANDLE g_hEmptySemaphore;
void Produce(){
printf("生产一个产品:%d号产品\n",++ProductID);
g_buffer[in] = ProductID;
in = (in+1)%SIZE_OF_BUFFER;
for (int i=0;i<SIZE_OF_BUFFER;++i)
{
printf("%d号缓冲区:产品%d\n",i,g_buffer[i]);
if(i==in) printf(" <-- 生产\n");
if(i==out) printf(" <-- 消费\n");
}
printf("\n");
}
void Consume(){
printf("消耗了一个产品:%d号产品\n",g_buffer[out]);
g_buffer[out]=0;
out = (out+1)%SIZE_OF_BUFFER;
for (int i=0;i<SIZE_OF_BUFFER;++i)
{
printf("%d号缓冲区:产品%d\n",i,g_buffer[i]);
if (i==in) printf(" <-- 生产\n");
if (i==out) printf(" <-- 消费\n");
}
printf("\n");
}
DWORD WINAPI Producer(LPVOID lpPara)
{
while(g_continue)
{
WaitForSingleObject(g_hEmptySemaphore,INFINITE);
WaitForSingleObject(g_hMutex,INFINITE);
Produce();
Sleep(2000);
ReleaseMutex(g_hMutex);
ReleaseSemaphore(g_hFullSemaphore,1,NULL);
}
return 0;
}
DWORD WINAPI Consumer(LPVOID lpPara)
{
while(g_continue)
{
WaitForSingleObject(g_hFullSemaphore,INFINITE);
WaitForSingleObject(g_hMutex,INFINITE);
Consume();
Sleep(2000);
ReleaseMutex(g_hMutex);
ReleaseSemaphore(g_hEmptySemaphore,1,NULL);
}
return 0;
}
int main()
{
// 创建互斥信号量 mutex
g_hMutex = CreateMutex(NULL, FALSE, NULL);
// 创建同步信号量 full
g_hFullSemaphore = CreateSemaphore(NULL, 0, SIZE_OF_BUFFER, NULL);
// 创建同步信号量 empty
g_hEmptySemaphore = CreateSemaphore(NULL, SIZE_OF_BUFFER, SIZE_OF_BUFFER, NULL);
// 调整生产者线程和消费者线程的个数,看看结果有何不同。
const int PRODUCERS_COUNT = 6; // 生产者的个数
const int CONSUMERS_COUNT = 2; // 消费者的个数
const int THREADS_COUNT = PRODUCERS_COUNT + CONSUMERS_COUNT; // 计算总的线程数
HANDLE hThreads[THREADS_COUNT]; // 各线程的 handle
DWORD producerID[PRODUCERS_COUNT]; // 生产者线程的标识符
DWORD consumerID[CONSUMERS_COUNT]; // 消费者线程的标识符
// 创建生产者线程
for (int i = 0; i < PRODUCERS_COUNT; ++i)
{
hThreads[i] = CreateThread(NULL, 0, Producer, NULL, 0, &producerID[i]);
if (hThreads[i] == NULL)
{
return -1;
}
}
// 创建消费者线程
for (int j = 0; j < CONSUMERS_COUNT; ++j)
{
hThreads[PRODUCERS_COUNT + j] = CreateThread(NULL, 0, Consumer, NULL, 0, &consumerID[j]);
if (hThreads[j] == NULL)
{
return -1;
}
}
// 控制线程终止
while (g_continue)
{
if (getchar() == '\n') // 按回车后终止程序运行
{
g_continue = false;
}
}
return 0;
}