初探RTOS
本文将对RTOS的实现做一个概述,背景基于arm处理器,默认对arm处理器有一定认知,基础部分不在赘述,有兴趣可以参考我之前的文章:https://www.nowcoder.com/discuss/477145437758148608。
为什么需要OS
裸机中的代码逻辑
在裸机开发中,程序通常的结构通常如下:
void main() { init(); while(1) { func1(); func2(); func3(); ... } }
即在初始化操作完成后,在while(1)循环中重复执行若干函数。但假如其中某个或某些函数存在阻塞操作,如它需要等待某个条件得到满足(例如延时),一种简单的做法是:
void func1() { do_something(); while(delay_count) delay_count--; do_something(); ... }
显然,这种方式效率很低,因为后续的执行都被前面的函数阻塞了。而CPU有中断的功能,因此这个流程可以做如下优化:
void func1() { do_something(); while(!delay_count) do_something(); ... } void timerIRQ() { delay_count--; }
这样CPU就不会空等,而是会在等待中执行其他操作。不过这样做显然会给代码的设计增添不小的难度,因为需要考虑的东西更多了,且最终软件架构也很凌乱。
如何优化
那么怎么去优化呢,我们可以借鉴中断的逻辑结构,即保存现场-->转去执行其他的程序-->完毕后恢复现场。对于CPU而言,它的现场其实就两个:通用寄存器和栈。通用寄存器(R0-R15,CPSR等)决定了CPU当前的执行状态,而栈反映了程序流的调用关系。这其中,栈实际上只是内存中的一片区域,在规则确定(例如是递增栈还是递减栈)的情况下,只需要知道栈顶地址即可,而栈顶地址又通过R13寄存器保存,因此,在栈不被破坏(如出现内存踩踏)的情况下,只要保存CPU的寄存器值,那么就能够实现现场恢复。更进一步,我们可以将寄存器值存在栈上,这样只需要通过R13寄存器获取栈顶地址,就可以恢复寄存器的状态,从而恢复现场。于是我们可以设计一个函数:
void yield { save_context(); save_old_sp(); set_new_sp(); restore_context(); }
其中save_context()、restore_context()分别对应将寄存器压栈保存、退栈恢复动作;save_old_sp()、set_new_sp()分别对应保存当前R13寄存器值和设置R13寄存器为其他值。某种程度上来说,当函数调用yield函数时,类似于主动触发了一次中断,只不过由于在中断中修改了R13寄存器,从而导致退栈时使用了不同的栈空间,返回到了不同的地址。对于yield的调用者而言,yield函数只是push了一大堆寄存器又pop了出来,尽管事实上CPU在push和pop之间转去执行了其他操作。
RTOS中的调度
调度的基本模型
其实前面已经给出了一个基本的模型,我们进一步补充一下:
1. 将裸机里面while(1)的不同函数,根据用途不同封装为不同的task;
2. 每个task都给予一片内存空间用作它专有的栈;
3. 每个task都会记录当前栈顶指针,即R13寄存器的值;
最基础的,我们可以这样实现:
typedef void (*task_func)(void); typedef struct { uint32 r13; //用于调度时保存栈顶指针 uint32 stack_base; //指示当前task栈的基地址 uint32 stack_size; //指示当前task栈的大小; task_func func; //当前task的回调; char name[50]; //当前task名字; uint32 pid; //task的ID }xTaskHandler_t; xTaskHandler_t *pxCurrentTask = NULL; void yield { save_context(); //压栈保存寄存器 pxCurrentTask->r13 = get_current_sp(); //将当前r13(sp)寄存器值保存; pxCurrentTask = find_next_task(); //找到需要调度的下一个task set_sp(pxCurrentTask->r13); //修改r13寄存器的值 /*从这里开始,进入到其他task的世界*/ restore_context(); //退栈恢复现场 } void task1_func { //task1.func while(1) yield(); } void task2_func { //task2.func while(1) yield(); } extern void vTaskInit(xTaskHandler_t* task, uint32 stack_size, task_func func); void main() { xTaskHandler_t task1; xTaskHandler_t task2; vTaskInit(&task1, 0x400, task1_func); vTaskInit(&task2, 0x400, task2_func); //初始化结构体 start_schedule(); while(1); }
这样我们就实现了一个基本的RTOS,其中包含了两个task,当需要调度时,手动调用yield,保存当前寄存器到当前task的栈中,同时将当前栈指针寄存器R13更新至task结构体中,然后寻找下一个需要调度的task,将待调度的task结构体中的存的R13值写到R13寄存器中,执行退栈操作,CPU会从下一个task的yield的restore_context()开始执行。
更具体一些
1. vTaskInit()
实现task结构体的初始化。需要实现栈的分配,回调函数的注册等。还需要实现栈内容的初始化,主要是第一次调度寄存器的初始值。
extern void vInitStack(uint32 stack); void vTaskInit(xTaskHandler_t* task, uint32 stack_size, task_func func) { task->stack_base = malloc(stack_size); task->stack_size = stack_size; task->func = func; vInitStack((uint32)(task->stack_base)); }
2. start_schedule()
以CR5为例,通常会触发一次软件中断,开启第一次调度:
void svcHandler(void) { pxCurrentTask = find_next_task(); //找到第一个task set_sp(pxCurrentTask->r13); //修改r13寄存器的值 restore_context(); //将寄存器设为前面vInitStack中配置的值,这样出中断后,会进入第一个task执行 } void start_schedule(void) { __asm__("swi #0"); //会触发软件中断,执行svcHandler回调 }
如何拓展
1. 上面的模型中,没有展开说find_next_task()的实现,这个接口代表了调度的策略,即如何选择下一个应该执行的task。这里给一个最简单是实例,每次find_next_task都返回当前可调度的task中优先级最高的,这样task结构体中就需要添加一个新的变量来代表优先级。同样的我们也需要新的数据结构来记录当前可调度的task,比如链表,这样就需要在task结构体中加入一个链表指针:
struct XLIST_ITEM { struct XLIST_ITEM *pxPrevious; struct XLIST_ITEM *pxNext; }; typedef struct XLIST_ITEM xListItem_t; typedef void (*task_func)(void); typedef struct { xListItem_t list_item //用于将task结构体挂在不同的链表上 uint32 priority; //task优先级 uint32 r13; //用于调度时保存栈顶指针 uint32 stack_base; //指示当前task栈的基地址 uint32 stack_size; //指示当前task栈的大小; task_func func; //当前task的回调; }xTaskHandler_t; typedef struct { uint32 itemNum; //链表元素个数 xListItem* pxFirstItem; //链表首个元素 xListItem* pxLastItem; //链表末尾元素 }xList_t; static xList_t xReadyList; xTaskHandler_t *pxCurrentTask = NULL; extern void vInsertToEnd(xList_t *list, xListItem_t* item); extern void vInitList(xList_t *list); extern void vTaskInit(xTaskHandler_t* task, uint32 stack_size, task_func func); void main() { xTaskHandler_t task1; xTaskHandler_t task2; vTaskInit(&task1, 0x400, task1_func); vTaskInit(&task2, 0x400, task2_func); //初始化结构体 vInitList(&xReadyList); //初始化链表 vInsertToEnd(&xReadyList, &(task1.list_item)); vInsertToEnd(&xReadyList, &(task2.list_item)); //添加task到Ready链表 start_schedule(); while(1); }
这样,find_next_task实现为通过遍历ready链表,找到优先级最高的task返回:
//下面的接口实现通过task结构体的list_item得到task结构体的指针的功能 extern xTaskHandler_t* getTaskFromListItem(xListItem_t* item); xTaskHandler_t* find_next_task(void) { xListItem* pIterator = xReadyList.pxFirstItem; xTaskHandler_t* result = getTaskFromListItem(xReadyList.pxFirstItem); while(pIterator != xReadyList.pxLastItem) { if (getTaskFromListItem(pIterator)->priority < result->priority) //假设priority值小的优先级高 result = getTaskFromListItem(pIterator); } return result; }
你可能觉得这样效率不够高,因为每次都需要遍历一遍整个链表。针对这个问题,还可以做出如下改进:
//将ready链表改成链表数组,priority为1则加入xReadyList[1], priority为2则加入xReadyList[2], 以此类推 static xList_t xReadyList[MAX_PRIORITY]; static uint32 uxCurrentMaxPriority; xTaskHandler_t *pxCurrentTask = NULL; extern void vInsertToEnd(xList_t *list, xListItem_t* item); extern void vInitList(xList_t *list); extern void vTaskInit(xTaskHandler_t* task, uint32 stack_size, task_func func); void main() { xTaskHandler_t task1; xTaskHandler_t task2; vTaskInit(&task1, 0x400, task1_func); vTaskInit(&task2, 0x400, task2_func); //初始化结构体 for (uint32 i=0; i<MAX_PRIORITY; i++) vInitList(&(xReadyList[i])); //初始化链表 vInsertToEnd(&(xReadyList[task1.priority]), &(task1.list_item)); vInsertToEnd(&(xReadyList[task2.priority]), &(task2.list_item)); //添加task到Ready链表 start_schedule(); while(1); }
将ready链表改为数组,通过一个变量uxCurrentMaxPriority记录当前存在的可调度的优先级最高的task,它的优先级是多少,即priority值最小是多少。这样find_next_task只需要从链表xReadyList[uxCurrentMaxPriority]中取一个即可:
//下面的接口实现通过task结构体的list_item得到task结构体的指针的功能 extern xTaskHandler_t* getTaskFromListItem(xListItem_t* item); //下面的接口从list中获取一个item extern xListItem_t* getItem(xList_t* list); xTaskHandler_t* find_next_task(void) { xList_t* pTempList = &(xReadyList[uxCurrentMaxPriority]); return getTaskFromListItem(getItem(pTempList)); }
2. 上面的模型还存在一个问题,即只有当前正在运行的task主动要求调度时,才会进行调度。但实际使用中我们常常会在中断退出时触发一次调度,这样做的好处是防止高优先级的task长时间被低优先的task阻塞。注意,CR5中中断使用的是R13_irq, R14_irq,与正常运行时的R13_svc,R14_svc不同,因此切换时需要改变的是svc模式下的R13:
void irqHandler { save_context(); //压栈保存寄存器 irqISR(); //中断的回调 pxCurrentTask->r13 = get_svc_sp(); //将当前svc_r13(sp)寄存器值保存 pxCurrentTask = find_next_task(); //找到需要调度的下一个task set_svc_sp(pxCurrentTask->r13); //修改svc_r13寄存器的值 /*从这里开始,进入到其他task的世界*/ restore_context(); //退栈恢复现场 }
在中断退出时调度还有一个好处,即可以通过向target core发送一个中断(即核间通信中断,在SOC中十分场景)强迫其执行一次调度。
此外,在smp系统中,可以通过向其他core发送一个软件中断,使其他core进行调度。可以设想这样一个场景,core0正在执行一个优先级较低的task,core1则没有task执行,处于idle状态,此时core0收到外部中断,这个中断中唤起了一个优先级更高的task,出中断后core0转而执行优先级更高的task,那么它就可以给core1发送一个软中断,这样core1就会从idle状态唤醒转而执行原先在core0上执行的优先级较低的task。
延时的实现
延时函数
在前面的基础上,我们再来看看如何在RTOS的语境下实现延时。现在我们有一个timer,它会定时报中断给CPU,那么我们就可以维护一个全局变量,它记录了当前系统触发的定时中断的数量,每进一次定时中断都将这个变量加1,我们将这个变量称为系统节拍数。当一个task需要执行延时操作时,就可以使用当前系统节拍数+延时节拍数,得到一个唤醒节拍数。当系统节拍数达到唤醒节拍数时,出定时中断触发调度,就可能回到该task执行。注意,这里说可能,是因为触发调度后,有可能会切到其他优先级更高的task去,可以理解为切到了调用延时的task,紧接着被更高优先级的task抢占;另一方面,延时的精度还会受到定时中断周期限制。这两点结合可以看出,涉及到调度的延时并非绝对意义的准确。
参照前面ready链表的策略,我们也可以实现一个delay链表,所有执行了延时操作的task,我们都将它从ready链表中去掉并加入dealy链表,当延时到了后,再从delay链表中去掉该task加入ready链表。同时为了提升效率,修改一下xListItem_t
这个结构体,给他添加一个变量value
用于记录唤醒时间,这样在插入delay链表时可以按照唤醒先后的顺序,即按照value
升序,使delay链表为一个有序链表。这样当延时到了要从delay链表中去掉task时,无需再遍历整条链表:
struct XLIST_ITEM { struct XLIST_ITEM *pxPrevious; struct XLIST_ITEM *pxNext; uint32 value; }; typedef struct XLIST_ITEM xListItem_t; typedef struct { uint32 itemNum; //链表元素个数 xListItem* pxFirstItem; //链表首个元素 xListItem* pxLastItem; //链表末尾元素 }xList_t; static xList_t xDelayList; //delay链表 static xList_t xReadyList[MAX_PRIORITY]; //ready链表 xTaskHandler_t *pxCurrentTask = NULL; xList_t* pxDelayList = &xDelayList; static uint32 systemCount; //系统节拍数 //按item->value升序将item插入链表 extern void vInsertInOrder(xList_t *list, xListItem_t* item); //获取list中首个item并从链表中去掉 extern xListItem_t* vRemoveFirstItem(xList_t *list); //将指定item并从链表中去掉 extern void vRemoveItem(xList_t *list, xListItem_t* item); //设置item->value extern xListItem_t* vSetItemValue(xListItem_t *item, uint32 value); //将item加入list尾 extern void vInsertToEnd(xList_t *list, xListItem_t* item); //下面的接口实现通过task结构体的list_item得到task结构体的指针的功能 extern xTaskHandler_t* getTaskFromListItem(xListItem_t* item); void delay(uint32 count) { uint32 timeToWake = systemCount + count; //唤醒时间 vSetItemValue(&(pxCurrentTask->list_item), timeToWake); //将唤醒时间写到list_item->value vRemoveItem(&(xReadyList[pxCurrentTask->priority]), &(pxCurrentTask->list_item)); //从ready队列中去掉 vInsertInOrder(pxDelayList, &(pxCurrentTask->list_item)); //加入到delay链表 yield(); } void timerIRQ(void) { save_context(); //压栈保存寄存器 systemCount++; //系统节拍数+1 if (systemCount == pxDelayList->pxFirstItem->value) { //delay链表中首个元素为最先超时的 xListItem_t* temp = vRemoveFirstItem(pxDelayList); //从delay链表中去掉 xList_t* pxReadyList = &(xReadyList[getTaskFromListItem(temp)->priority]); vInsertToEnd(pxReadyList, temp); //加入ready链表 } pxCurrentTask->r13 = get_svc_sp(); //将当前svc_r13(sp)寄存器值保存 pxCurrentTask = find_next_task(); //找到需要调度的下一个task set_svc_sp(pxCurrentTask->r13); //修改svc_r13寄存器的值 /*从这里开始,进入到其他task的世界*/ restore_context(); //退栈恢复现场 }
你可能注意到了,这样做有个问题,就是systemCount
可能会溢出。为了解决这个问题,我们可以维护两条delay链表,具体实现不展开说了,有兴趣可以自己考虑一下。
时间片
如果上面的基础上,对find_next_task
进行一定修改,可以实现时间片的功能。
//下面的接口实现通过task结构体的list_item得到task结构体的指针的功能 extern xTaskHandler_t* getTaskFromListItem(xListItem_t* item); //下面的接口从list中获取一个不同于参数item的item extern xListItem_t* getNewItem(xList_t* list, xListItem_t* item); xTaskHandler_t* find_next_task(void) { xList_t* pTempList = &(xReadyList[uxCurrentMaxPriority]); return getTaskFromListItem(getNewItem(pTempList)); }
时间片可以用在同优先级的task之间,让同一优先级的task都有机会获得调度,但是有时间片的情况下,会导致SOC频繁被中断打断,系统没法长时间处于low power mode,功耗方面可能有些问题。
结语
本章主要概述了RTOS的基本框架,关于内核对象,如消息队列,互斥锁,信号量等后续会单独讨论。
#嵌入式工程师##嵌入式#21届985微电子硕士,校招SSP进入紫光展锐任嵌入式工程师,8个月后跳槽至ASR工作至今。本专栏记录上学/工作时学到的一些知识,debug的一些工具使用等,希望有所帮助。