RTOS中断内核对象(一)
在上一篇文章https://www.nowcoder.com/discuss/479716637294235648中,概述了RTOS的调度模型,谈到了task中如何主动发起调度让渡CPU。但实际使用中,task通常不会显式主动去调用yield让渡CPU,而是通过内核对象,当获取内核对象造成阻塞,由内核对象去调用yield,即调度对于task而言是封闭的。从task的角度看,内核对象是进行了自旋还是调度没有区别。这里内核对象包括了Message Queue,Event Group,Semaphore,Mutex等。本文主要会谈到Message Queue,因为实际上Semaphore和Mutex都可以通过Message Queue实现。
Message Queue概述
什么是Message Queue
Message Queue,即消息队列,本质上来说只是一段内存,和裸机开发中全局变量的作用相同。只不过在裸机开发中,代码是线性流程,编程者需要负责所有程序流程的设计,相当于站在上帝视角上;而在RTOS语境下,任务被拆分成了不同的task,每个task都focus在自己的任务上,此时对于全局变量的访问就需要注意,是不是只有我一个线程回去访问这个全局变量,如果不是的话,就需要解决多task同时访问的问题。另一方面,如果我希望使用这段内存来做不同task之间的消息传递,就需要机制保持访问次序,即生产者应该早于消费者。如果消费者访问早于生成者,那它就应该等待,直到生产者完成访问。消息队列便是RTOS提供给所有task的,能够解决上述问题的内核对象。
RTOS下的实现
在多task的情况下,对于共有内存的访问,我们希望实现的是,当task1正在访问时,task2如果也要访问,那么它应当等待task1访问完毕后再访问,在等待的期间,为了效率最大化,可以主动调度出去,待task1访问结束后再调度回来。顺着这个思路,可以做如下实现:
struct XLIST_ITEM { struct XLIST_ITEM *pxPrevious; struct XLIST_ITEM *pxNext; uint32 value; }; typedef struct XLIST_ITEM xListItem_t; typedef struct { uint32 itemNum; //链表元素个数 xListItem* pxFirstItem; //链表首个元素 xListItem* pxLastItem; //链表末尾元素 }xList_t; typedef struct { xListItem_t list_item //用于将task结构体挂在不同的链表上 uint32 priority; //task优先级 uint32 r13; //用于调度时保存栈顶指针 uint32 stack_base; //指示当前task栈的基地址 uint32 stack_size; //指示当前task栈的大小; task_func func; //当前task的回调; }xTaskHandler_t; typedef struct { uint32* sharemem; //共享内存 xList_t receiveWaitList; //接收等待链表 xList_t sendWaitList; //发送等待链表 bool haveValidValue; //当前队列中是否有有效数据 }xMsgQueue_t; xTaskHandler_t* pxCurrentTask = NULL; //当前正在执行的task结构体指针 static xList_t xReadyList[MAX_PRIORITY]; //可以被调度的task的链表,即ready链表 //将item插入链表list尾部 extern void vInsertToEnd(xList_t *list, xListItem_t* item); //将指定item并从链表list中去掉 extern void vRemoveItem(xList_t *list, xListItem_t* item); //判断列表是否为空 extern bool isListEmpty(xList_t *list); //进入临界区,在临界区内不会产生调度 extern void enterCritical(); //退出临界区,在临界区内不会产生调度 extern void exitCritical(); //下面的接口实现通过task结构体的list_item得到task结构体的指针的功能 extern xTaskHandler_t* getTaskFromListItem(xListItem_t* item); void vMsgQueueReceive(xMsgQueue_t* queue, uint32* data) { xListItem* temp; while(1) { enterCritical(); //有有效数据 if (haveValidValue == 1) { haveValidValue = 0; //消息已被读走 memcpy(data, queue->sharemem); //获取sharemem中的内容 if (!isListEmpty(queue->sendWaitList)) { //目前消息队列以空,将发送等待链表中第一个task重新加入ready链表参与调度(如果有的话) temp = queue->sendWaitList->pxFirstItem; if (temp != NULL) { vRemoveItem(queue->sendWaitList, temp); vInsertToEnd(&(xReadyList[getTaskFromListItem(temp)->priority]), temp); } exitCritical(); break; } } else { //没有有效数据,挂起自己等待其他task写入 //将当前task从ready链表去掉 vRemoveItem(&(xReadyList[pxCurrentTask->priority]), &(pxCurrentTask->list_item)); //将当前task加到queue的接收等待链表 vInsertToEnd(queue->receiveWaitList, &(pxCurrentTask->list_item)); exitCritical(); //触发调度 yield(); } } //如果有task挂在发送等待链表,则调度一次给予该task运行机会 if (temp != NULL) yield(); } void vMsgQueueSend(xMsgQueue_t* queue, uint32* data) { xListItem* temp; while(1) { enterCritical(); //无有效数据 if (queue->haveValidValue == 0)) { queue->haveValidValue = 1; //消息已写入 memcpy(data, queue->sharemem); //获取sharemem中的内容 if (!isListEmpty(queue->receiveWaitList)) { //目前消息队列已写入数据,将接收等待链表中第一个task重新加入ready链表参与调度 temp = queue->receiveWaitList->pxFirstItem; if (temp != NULL) { vRemoveItem(queue->receiveWaitList, temp); vInsertToEnd(&(xReadyList[getTaskFromListItem(temp)->priority]), temp); } exitCritical(); break; } } else { //有有效数据,挂起自己等待数据被读走 //将当前task从ready链表去掉 vRemoveItem(&(xReadyList[pxCurrentTask->priority]), &(pxCurrentTask->list_item)); //将当前task加到queue的发送等待链表 vInsertToEnd(queue->sendWaitList, &(pxCurrentTask->list_item)); exitCritical(); //触发调度 yield(); } } if (temp != NULL) yield(); }
注意,再对内核对象或全局变量操作时,需要防止产生调度,即进入临界区。当task调用vMsgQueueSend
时,如果此时队列已满/为空,则会产生阻塞,task被从ready链表中拿去,内核对象中主动调度;当其他task将队列中消息读走,队列中可以写入新消息时,则将task重新加入ready链表,内核对象中主动调度,原先的task继续执行,对于原先的task而言,它对于中间的调度过程并无感知。反过来task调用vMsgQueueReceive
时也是同理。
如何扩展
上面的实现其实不能称为队列,因为实际上只能存储一个消息。我们可以将其扩展为多个,只需要将sharemem改为数组,对应其他的修改如下:
struct XLIST_ITEM { struct XLIST_ITEM *pxPrevious; struct XLIST_ITEM *pxNext; uint32 value; }; typedef struct XLIST_ITEM xListItem_t; typedef struct { uint32 itemNum; //链表元素个数 xListItem* pxFirstItem; //链表首个元素 xListItem* pxLastItem; //链表末尾元素 }xList_t; typedef struct { xListItem_t list_item //用于将task结构体挂在不同的链表上 uint32 priority; //task优先级 uint32 r13; //用于调度时保存栈顶指针 uint32 stack_base; //指示当前task栈的基地址 uint32 stack_size; //指示当前task栈的大小; task_func func; //当前task的回调; }xTaskHandler_t; #define MSG_NUM 16 typedef struct { uint32* msg[MSG_NUM]; //消息队列 uint32 writePtr; //消息队列写指针位置 uint32 readPtr; //消息队列读指针位置 xList_t receiveWaitList; //接收等待链表 xList_t sendWaitList; //发送等待链表 uint32 msgNum; //当前队列中有效消息个数 }xMsgQueue_t; xTaskHandler_t* pxCurrentTask = NULL; //当前正在执行的task结构体指针 static xList_t xReadyList[MAX_PRIORITY]; //可以被调度的task的链表,即ready链表 //将item插入链表list尾部 extern void vInsertToEnd(xList_t *list, xListItem_t* item); //将指定item并从链表list中去掉 extern void vRemoveItem(xList_t *list, xListItem_t* item); //判断列表是否为空 extern bool isListEmpty(xList_t *list); //进入临界区,在临界区内不会产生调度 extern void enterCritical(); //退出临界区,在临界区内不会产生调度 extern void exitCritical(); //下面的接口实现通过task结构体的list_item得到task结构体的指针的功能 extern xTaskHandler_t* getTaskFromListItem(xListItem_t* item); void vReadFromQueue(xMsgQueue_t* queue, uint32** data) { *data = queue->msg[queue->readPtr]; queue->readPtr++; //读指针加1 queue->readPtr %= MSG_NUM; //越界从头开始 queue->msgNum--; //消息数减1 } void vWriteToQueue(xMsgQueue_t* queue, uint32* data) { queue->msg[queue->writePtr] = data; queue->writePtr++; //写指针加1 queue->writePtr %= MSG_NUM; //越界从头开始 queue->msgNum++; //消息数减1 } void vMsgQueueReceive(xMsgQueue_t* queue, uint32** data) { xListItem* temp; while(1) { enterCritical(); //队列非空 if (queue->msgNum > 0) { vReadFromQueue(queue, data); //获取消息 if (!isListEmpty(queue->sendWaitList)) { //目前消息队列未满,将发送等待链表中第一个task重新加入ready链表参与调度(如果有的话) temp = queue->sendWaitList->pxFirstItem; if (temp != NULL) { vRemoveItem(queue->sendWaitList, temp); vInsertToEnd(&(xReadyList[getTaskFromListItem(temp)->priority]), temp); } exitCritical(); break; } } else { //队列为空,挂起自己等待其他task写入 //将当前task从ready链表去掉 vRemoveItem(&(xReadyList[pxCurrentTask->priority]), &(pxCurrentTask->list_item)); //将当前task加到queue的接收等待链表 vInsertToEnd(queue->receiveWaitList, &(pxCurrentTask->list_item)); exitCritical(); //触发调度 yield(); } } //如果有task挂在发送等待链表,则调度一次给予该task运行机会 if (temp != NULL) yield(); } void vMsgQueueSend(xMsgQueue_t* queue, uint32* data) { xListItem* temp; while(1) { enterCritical(); //队列未满 if (queue->msgNum < MSG_NUM)) { vWriteToQueue(queue, &data); //获取sharemem中的内容 if (!isListEmpty(queue->receiveWaitList)) { //目前消息队列已写入消息,将接收等待链表中第一个task重新加入ready链表参与调度 temp = queue->receiveWaitList->pxFirstItem; if (temp != NULL) { vRemoveItem(queue->receiveWaitList, temp); vInsertToEnd(&(xReadyList[getTaskFromListItem(temp)->priority]), temp); } exitCritical(); break; } } else { //队列已满,挂起自己等待消息被读走 //将当前task从ready链表去掉 vRemoveItem(&(xReadyList[pxCurrentTask->priority]), &(pxCurrentTask->list_item)); //将当前task加到queue的发送等待链表 vInsertToEnd(queue->sendWaitList, &(pxCurrentTask->list_item)); exitCritical(); //触发调度 yield(); } } if (temp != NULL) yield(); }
此外,我们也可以为消息队列添加一个超时机制,此时如果task由于发送/接收消息队列时产生了阻塞,那么将该task从ready链表中拿掉的后,同时加入dealy链表和消息队列的发送/接收等待链表,当超时或队列运行发送/接收两条件任意一个得到满足,将task从这两个链表中同时去掉,并加入ready链表。
未完待续
#嵌入式##嵌入式工程师#21届985微电子硕士,校招SSP进入紫光展锐任嵌入式工程师,8个月后跳槽至ASR工作至今。本专栏记录上学/工作时学到的一些知识,debug的一些工具使用等,希望有所帮助。