RTOS中断内核对象(一)

在上一篇文章https://www.nowcoder.com/discuss/479716637294235648中,概述了RTOS的调度模型,谈到了task中如何主动发起调度让渡CPU。但实际使用中,task通常不会显式主动去调用yield让渡CPU,而是通过内核对象,当获取内核对象造成阻塞,由内核对象去调用yield,即调度对于task而言是封闭的。从task的角度看,内核对象是进行了自旋还是调度没有区别。这里内核对象包括了Message Queue,Event Group,Semaphore,Mutex等。本文主要会谈到Message Queue,因为实际上Semaphore和Mutex都可以通过Message Queue实现。

Message Queue概述

什么是Message Queue

Message Queue,即消息队列,本质上来说只是一段内存,和裸机开发中全局变量的作用相同。只不过在裸机开发中,代码是线性流程,编程者需要负责所有程序流程的设计,相当于站在上帝视角上;而在RTOS语境下,任务被拆分成了不同的task,每个task都focus在自己的任务上,此时对于全局变量的访问就需要注意,是不是只有我一个线程回去访问这个全局变量,如果不是的话,就需要解决多task同时访问的问题。另一方面,如果我希望使用这段内存来做不同task之间的消息传递,就需要机制保持访问次序,即生产者应该早于消费者。如果消费者访问早于生成者,那它就应该等待,直到生产者完成访问。消息队列便是RTOS提供给所有task的,能够解决上述问题的内核对象。

RTOS下的实现

在多task的情况下,对于共有内存的访问,我们希望实现的是,当task1正在访问时,task2如果也要访问,那么它应当等待task1访问完毕后再访问,在等待的期间,为了效率最大化,可以主动调度出去,待task1访问结束后再调度回来。顺着这个思路,可以做如下实现:

struct XLIST_ITEM {
  struct XLIST_ITEM *pxPrevious;
  struct XLIST_ITEM *pxNext;
  uint32 value;
};
typedef struct XLIST_ITEM xListItem_t;
typedef struct {
  uint32 itemNum;			//链表元素个数
  xListItem* pxFirstItem;	//链表首个元素
  xListItem* pxLastItem;	//链表末尾元素
}xList_t;
typedef struct
{
  xListItem_t list_item	//用于将task结构体挂在不同的链表上
  uint32 priority;		//task优先级
  uint32 r13;			//用于调度时保存栈顶指针
  uint32 stack_base;	//指示当前task栈的基地址
  uint32 stack_size;	//指示当前task栈的大小;
  task_func func;		//当前task的回调;
}xTaskHandler_t;

typedef struct {
  uint32* sharemem;			//共享内存
  xList_t receiveWaitList;			//接收等待链表
  xList_t sendWaitList;			//发送等待链表
  bool	haveValidValue;			//当前队列中是否有有效数据
}xMsgQueue_t;

xTaskHandler_t* pxCurrentTask = NULL;	//当前正在执行的task结构体指针
static xList_t xReadyList[MAX_PRIORITY];	//可以被调度的task的链表,即ready链表

//将item插入链表list尾部
extern void vInsertToEnd(xList_t *list, xListItem_t* item);
//将指定item并从链表list中去掉
extern void vRemoveItem(xList_t *list, xListItem_t* item);
//判断列表是否为空
extern bool isListEmpty(xList_t *list);
//进入临界区,在临界区内不会产生调度
extern void enterCritical();
//退出临界区,在临界区内不会产生调度
extern void exitCritical();
//下面的接口实现通过task结构体的list_item得到task结构体的指针的功能
extern xTaskHandler_t* getTaskFromListItem(xListItem_t* item);

void vMsgQueueReceive(xMsgQueue_t* queue, uint32* data) {
  xListItem* temp;
  while(1) {
	enterCritical();
	//有有效数据
	if (haveValidValue == 1) {
	  haveValidValue = 0;	//消息已被读走
	  memcpy(data, queue->sharemem);	//获取sharemem中的内容
	  if (!isListEmpty(queue->sendWaitList)) {
		//目前消息队列以空,将发送等待链表中第一个task重新加入ready链表参与调度(如果有的话)
		temp = queue->sendWaitList->pxFirstItem;
		if (temp != NULL) {
			vRemoveItem(queue->sendWaitList, temp);
			vInsertToEnd(&(xReadyList[getTaskFromListItem(temp)->priority]), temp);
		}
		exitCritical();
		break;
	  }
	} else {	//没有有效数据,挂起自己等待其他task写入
	  //将当前task从ready链表去掉
	  vRemoveItem(&(xReadyList[pxCurrentTask->priority]), &(pxCurrentTask->list_item)); 
	  //将当前task加到queue的接收等待链表
	  vInsertToEnd(queue->receiveWaitList, &(pxCurrentTask->list_item));
	  exitCritical();
	  //触发调度
	  yield();
	}
  }
  //如果有task挂在发送等待链表,则调度一次给予该task运行机会
  if (temp != NULL)
	yield();
}

void vMsgQueueSend(xMsgQueue_t* queue, uint32* data) {
  xListItem* temp;
  while(1) {
	enterCritical();
	//无有效数据
	if (queue->haveValidValue == 0)) {
	  queue->haveValidValue = 1;	//消息已写入
	  memcpy(data, queue->sharemem);	//获取sharemem中的内容
	  if (!isListEmpty(queue->receiveWaitList)) {
		//目前消息队列已写入数据,将接收等待链表中第一个task重新加入ready链表参与调度
		temp = queue->receiveWaitList->pxFirstItem;
		if (temp != NULL) {
			vRemoveItem(queue->receiveWaitList, temp);
			vInsertToEnd(&(xReadyList[getTaskFromListItem(temp)->priority]), temp);
		}
		exitCritical();
		break;
	  }
	} else {					//有有效数据,挂起自己等待数据被读走
	  //将当前task从ready链表去掉
	  vRemoveItem(&(xReadyList[pxCurrentTask->priority]), &(pxCurrentTask->list_item)); 
	  //将当前task加到queue的发送等待链表
	  vInsertToEnd(queue->sendWaitList, &(pxCurrentTask->list_item));
	  exitCritical();
	  //触发调度
	  yield();
	}
  }
  if (temp != NULL)
	yield();
}

注意,再对内核对象或全局变量操作时,需要防止产生调度,即进入临界区。当task调用vMsgQueueSend时,如果此时队列已满/为空,则会产生阻塞,task被从ready链表中拿去,内核对象中主动调度;当其他task将队列中消息读走,队列中可以写入新消息时,则将task重新加入ready链表,内核对象中主动调度,原先的task继续执行,对于原先的task而言,它对于中间的调度过程并无感知。反过来task调用vMsgQueueReceive时也是同理。

如何扩展

上面的实现其实不能称为队列,因为实际上只能存储一个消息。我们可以将其扩展为多个,只需要将sharemem改为数组,对应其他的修改如下:

struct XLIST_ITEM {
  struct XLIST_ITEM *pxPrevious;
  struct XLIST_ITEM *pxNext;
  uint32 value;
};
typedef struct XLIST_ITEM xListItem_t;
typedef struct {
  uint32 itemNum;			//链表元素个数
  xListItem* pxFirstItem;	//链表首个元素
  xListItem* pxLastItem;	//链表末尾元素
}xList_t;
typedef struct
{
  xListItem_t list_item	//用于将task结构体挂在不同的链表上
  uint32 priority;		//task优先级
  uint32 r13;			//用于调度时保存栈顶指针
  uint32 stack_base;	//指示当前task栈的基地址
  uint32 stack_size;	//指示当前task栈的大小;
  task_func func;		//当前task的回调;
}xTaskHandler_t;

#define MSG_NUM		16
typedef struct {
  uint32* msg[MSG_NUM];			//消息队列
  uint32 writePtr;			//消息队列写指针位置
  uint32 readPtr;			//消息队列读指针位置
  xList_t receiveWaitList;			//接收等待链表
  xList_t sendWaitList;			//发送等待链表
  uint32 msgNum;			//当前队列中有效消息个数
}xMsgQueue_t;

xTaskHandler_t* pxCurrentTask = NULL;	//当前正在执行的task结构体指针
static xList_t xReadyList[MAX_PRIORITY];	//可以被调度的task的链表,即ready链表

//将item插入链表list尾部
extern void vInsertToEnd(xList_t *list, xListItem_t* item);
//将指定item并从链表list中去掉
extern void vRemoveItem(xList_t *list, xListItem_t* item);
//判断列表是否为空
extern bool isListEmpty(xList_t *list);
//进入临界区,在临界区内不会产生调度
extern void enterCritical();
//退出临界区,在临界区内不会产生调度
extern void exitCritical();
//下面的接口实现通过task结构体的list_item得到task结构体的指针的功能
extern xTaskHandler_t* getTaskFromListItem(xListItem_t* item);

void vReadFromQueue(xMsgQueue_t* queue, uint32** data) {
  *data = queue->msg[queue->readPtr];
  queue->readPtr++;				//读指针加1
  queue->readPtr %= MSG_NUM;	//越界从头开始
  queue->msgNum--;				//消息数减1
}
void vWriteToQueue(xMsgQueue_t* queue, uint32* data) {
  queue->msg[queue->writePtr] = data;
  queue->writePtr++;			//写指针加1
  queue->writePtr %= MSG_NUM;	//越界从头开始
  queue->msgNum++;				//消息数减1
}

void vMsgQueueReceive(xMsgQueue_t* queue, uint32** data) {
  xListItem* temp;
  while(1) {
	enterCritical();
	//队列非空
	if (queue->msgNum > 0) {
	  vReadFromQueue(queue, data);	//获取消息
	  if (!isListEmpty(queue->sendWaitList)) {
		//目前消息队列未满,将发送等待链表中第一个task重新加入ready链表参与调度(如果有的话)
		temp = queue->sendWaitList->pxFirstItem;
		if (temp != NULL) {
			vRemoveItem(queue->sendWaitList, temp);
			vInsertToEnd(&(xReadyList[getTaskFromListItem(temp)->priority]), temp);
		}
		exitCritical();
		break;
	  }
	} else {	//队列为空,挂起自己等待其他task写入
	  //将当前task从ready链表去掉
	  vRemoveItem(&(xReadyList[pxCurrentTask->priority]), &(pxCurrentTask->list_item)); 
	  //将当前task加到queue的接收等待链表
	  vInsertToEnd(queue->receiveWaitList, &(pxCurrentTask->list_item));
	  exitCritical();
	  //触发调度
	  yield();
	}
  }
  //如果有task挂在发送等待链表,则调度一次给予该task运行机会
  if (temp != NULL)
	yield();
}

void vMsgQueueSend(xMsgQueue_t* queue, uint32* data) {
  xListItem* temp;
  while(1) {
	enterCritical();
	//队列未满
	if (queue->msgNum < MSG_NUM)) {
	  vWriteToQueue(queue, &data);	//获取sharemem中的内容
	  if (!isListEmpty(queue->receiveWaitList)) {
		//目前消息队列已写入消息,将接收等待链表中第一个task重新加入ready链表参与调度
		temp = queue->receiveWaitList->pxFirstItem;
		if (temp != NULL) {
			vRemoveItem(queue->receiveWaitList, temp);
			vInsertToEnd(&(xReadyList[getTaskFromListItem(temp)->priority]), temp);
		}
		exitCritical();
		break;
	  }
	} else {					//队列已满,挂起自己等待消息被读走
	  //将当前task从ready链表去掉
	  vRemoveItem(&(xReadyList[pxCurrentTask->priority]), &(pxCurrentTask->list_item)); 
	  //将当前task加到queue的发送等待链表
	  vInsertToEnd(queue->sendWaitList, &(pxCurrentTask->list_item));
	  exitCritical();
	  //触发调度
	  yield();
	}
  }
  if (temp != NULL)
	yield();
}

此外,我们也可以为消息队列添加一个超时机制,此时如果task由于发送/接收消息队列时产生了阻塞,那么将该task从ready链表中拿掉的后,同时加入dealy链表和消息队列的发送/接收等待链表,当超时或队列运行发送/接收两条件任意一个得到满足,将task从这两个链表中同时去掉,并加入ready链表。

未完待续

#嵌入式##嵌入式工程师#
嵌入式学习 文章被收录于专栏

21届985微电子硕士,校招SSP进入紫光展锐任嵌入式工程师,8个月后跳槽至ASR工作至今。本专栏记录上学/工作时学到的一些知识,debug的一些工具使用等,希望有所帮助。

全部评论

相关推荐

暮雨轻歌:看起来hr不能接受我菜查看图片
点赞 评论 收藏
分享
评论
2
2
分享

创作者周榜

更多
牛客网
牛客企业服务