初探RTOS

本文将对RTOS的实现做一个概述,背景基于arm处理器,默认对arm处理器有一定认知,基础部分不在赘述,有兴趣可以参考我之前的文章:https://www.nowcoder.com/discuss/477145437758148608

为什么需要OS

裸机中的代码逻辑

在裸机开发中,程序通常的结构通常如下:

void main() {
  init();
  while(1) {
	func1();
	func2();
	func3();
	...
	}
}

即在初始化操作完成后,在while(1)循环中重复执行若干函数。但假如其中某个或某些函数存在阻塞操作,如它需要等待某个条件得到满足(例如延时),一种简单的做法是:

void func1() {
  do_something();
  while(delay_count)
	delay_count--;
  do_something();
  ...
}

显然,这种方式效率很低,因为后续的执行都被前面的函数阻塞了。而CPU有中断的功能,因此这个流程可以做如下优化:

void func1() {
  do_something();
  while(!delay_count)
	do_something();
  ...
}

void timerIRQ() {
  delay_count--;
}

这样CPU就不会空等,而是会在等待中执行其他操作。不过这样做显然会给代码的设计增添不小的难度,因为需要考虑的东西更多了,且最终软件架构也很凌乱。

如何优化

那么怎么去优化呢,我们可以借鉴中断的逻辑结构,即保存现场-->转去执行其他的程序-->完毕后恢复现场。对于CPU而言,它的现场其实就两个:通用寄存器和栈。通用寄存器(R0-R15,CPSR等)决定了CPU当前的执行状态,而栈反映了程序流的调用关系。这其中,栈实际上只是内存中的一片区域,在规则确定(例如是递增栈还是递减栈)的情况下,只需要知道栈顶地址即可,而栈顶地址又通过R13寄存器保存,因此,在栈不被破坏(如出现内存踩踏)的情况下,只要保存CPU的寄存器值,那么就能够实现现场恢复。更进一步,我们可以将寄存器值存在栈上,这样只需要通过R13寄存器获取栈顶地址,就可以恢复寄存器的状态,从而恢复现场。于是我们可以设计一个函数:

void yield {
  save_context();
  save_old_sp();
  set_new_sp();
  restore_context();
}

其中save_context()、restore_context()分别对应将寄存器压栈保存、退栈恢复动作;save_old_sp()、set_new_sp()分别对应保存当前R13寄存器值和设置R13寄存器为其他值。某种程度上来说,当函数调用yield函数时,类似于主动触发了一次中断,只不过由于在中断中修改了R13寄存器,从而导致退栈时使用了不同的栈空间,返回到了不同的地址。对于yield的调用者而言,yield函数只是push了一大堆寄存器又pop了出来,尽管事实上CPU在push和pop之间转去执行了其他操作

RTOS中的调度

调度的基本模型

其实前面已经给出了一个基本的模型,我们进一步补充一下:

1. 将裸机里面while(1)的不同函数,根据用途不同封装为不同的task;

2. 每个task都给予一片内存空间用作它专有的栈;

3. 每个task都会记录当前栈顶指针,即R13寄存器的值;

最基础的,我们可以这样实现:

typedef void (*task_func)(void);
typedef struct
{
  uint32 r13;			//用于调度时保存栈顶指针
  uint32 stack_base;	//指示当前task栈的基地址
  uint32 stack_size;	//指示当前task栈的大小;
  task_func func;		//当前task的回调;
  char	name[50];		//当前task名字;
  uint32	pid;		//task的ID
}xTaskHandler_t;

xTaskHandler_t *pxCurrentTask = NULL;

void yield {
  save_context();							//压栈保存寄存器
  pxCurrentTask->r13 = get_current_sp();		//将当前r13(sp)寄存器值保存;
  pxCurrentTask = find_next_task();			//找到需要调度的下一个task
  set_sp(pxCurrentTask->r13);				//修改r13寄存器的值
  /*从这里开始,进入到其他task的世界*/
  restore_context();						//退栈恢复现场
}

void task1_func {	//task1.func
  while(1)
 	yield();
}

void task2_func {	//task2.func
  while(1)
 	yield();
}
extern void vTaskInit(xTaskHandler_t* task, uint32 stack_size, task_func func);
void main() {
  xTaskHandler_t task1;
  xTaskHandler_t task2;
  vTaskInit(&task1, 0x400, task1_func);
  vTaskInit(&task2, 0x400, task2_func);		//初始化结构体
  start_schedule();
  while(1);
}

这样我们就实现了一个基本的RTOS,其中包含了两个task,当需要调度时,手动调用yield,保存当前寄存器到当前task的栈中,同时将当前栈指针寄存器R13更新至task结构体中,然后寻找下一个需要调度的task,将待调度的task结构体中的存的R13值写到R13寄存器中,执行退栈操作,CPU会从下一个task的yield的restore_context()开始执行。

更具体一些

1. vTaskInit()

实现task结构体的初始化。需要实现栈的分配,回调函数的注册等。还需要实现栈内容的初始化,主要是第一次调度寄存器的初始值。

extern void vInitStack(uint32 stack);
void vTaskInit(xTaskHandler_t* task, uint32 stack_size, task_func func) {
  task->stack_base = malloc(stack_size);
  task->stack_size = stack_size;
  task->func = func;
  vInitStack((uint32)(task->stack_base));
}

2. start_schedule()

以CR5为例,通常会触发一次软件中断,开启第一次调度:

void svcHandler(void) {
  pxCurrentTask = find_next_task();			//找到第一个task
  set_sp(pxCurrentTask->r13);				//修改r13寄存器的值
  restore_context();		//将寄存器设为前面vInitStack中配置的值,这样出中断后,会进入第一个task执行
}

void start_schedule(void) {
  __asm__("swi #0");			//会触发软件中断,执行svcHandler回调
}

如何拓展

1. 上面的模型中,没有展开说find_next_task()的实现,这个接口代表了调度的策略,即如何选择下一个应该执行的task。这里给一个最简单是实例,每次find_next_task都返回当前可调度的task中优先级最高的,这样task结构体中就需要添加一个新的变量来代表优先级。同样的我们也需要新的数据结构来记录当前可调度的task,比如链表,这样就需要在task结构体中加入一个链表指针:

struct XLIST_ITEM {
  struct XLIST_ITEM *pxPrevious;
  struct XLIST_ITEM *pxNext;
};
typedef struct XLIST_ITEM xListItem_t;

typedef void (*task_func)(void);
typedef struct
{
  xListItem_t list_item	//用于将task结构体挂在不同的链表上
  uint32 priority;		//task优先级
  uint32 r13;			//用于调度时保存栈顶指针
  uint32 stack_base;	//指示当前task栈的基地址
  uint32 stack_size;	//指示当前task栈的大小;
  task_func func;		//当前task的回调;
}xTaskHandler_t;

typedef struct {
  uint32 itemNum;			//链表元素个数
  xListItem* pxFirstItem;	//链表首个元素
  xListItem* pxLastItem;	//链表末尾元素
}xList_t;

static xList_t xReadyList;
xTaskHandler_t *pxCurrentTask = NULL;

extern void vInsertToEnd(xList_t *list, xListItem_t* item);
extern void vInitList(xList_t *list);
extern void vTaskInit(xTaskHandler_t* task, uint32 stack_size, task_func func);
void main() {
  xTaskHandler_t task1;
  xTaskHandler_t task2;
  vTaskInit(&task1, 0x400, task1_func);
  vTaskInit(&task2, 0x400, task2_func);		//初始化结构体
  vInitList(&xReadyList);					//初始化链表
  vInsertToEnd(&xReadyList, &(task1.list_item));
  vInsertToEnd(&xReadyList, &(task2.list_item));	//添加task到Ready链表
  start_schedule();
  while(1);
}

这样,find_next_task实现为通过遍历ready链表,找到优先级最高的task返回:

//下面的接口实现通过task结构体的list_item得到task结构体的指针的功能
extern xTaskHandler_t* getTaskFromListItem(xListItem_t* item);

xTaskHandler_t* find_next_task(void) {
  xListItem* pIterator = xReadyList.pxFirstItem;
  xTaskHandler_t* result = getTaskFromListItem(xReadyList.pxFirstItem);
  while(pIterator != xReadyList.pxLastItem) {
	if (getTaskFromListItem(pIterator)->priority < result->priority) //假设priority值小的优先级高
	  result = getTaskFromListItem(pIterator);
  }
  return result;
}

你可能觉得这样效率不够高,因为每次都需要遍历一遍整个链表。针对这个问题,还可以做出如下改进:

//将ready链表改成链表数组,priority为1则加入xReadyList[1], priority为2则加入xReadyList[2], 以此类推
static xList_t xReadyList[MAX_PRIORITY];
static uint32 uxCurrentMaxPriority;
xTaskHandler_t *pxCurrentTask = NULL;

extern void vInsertToEnd(xList_t *list, xListItem_t* item);
extern void vInitList(xList_t *list);
extern void vTaskInit(xTaskHandler_t* task, uint32 stack_size, task_func func);
void main() {
  xTaskHandler_t task1;
  xTaskHandler_t task2;
  vTaskInit(&task1, 0x400, task1_func);
  vTaskInit(&task2, 0x400, task2_func);		//初始化结构体
  for (uint32 i=0; i<MAX_PRIORITY; i++)
  	vInitList(&(xReadyList[i]));					//初始化链表
  vInsertToEnd(&(xReadyList[task1.priority]), &(task1.list_item));
  vInsertToEnd(&(xReadyList[task2.priority]), &(task2.list_item));	//添加task到Ready链表
  start_schedule();
  while(1);
}

将ready链表改为数组,通过一个变量uxCurrentMaxPriority记录当前存在的可调度的优先级最高的task,它的优先级是多少,即priority值最小是多少。这样find_next_task只需要从链表xReadyList[uxCurrentMaxPriority]中取一个即可:

//下面的接口实现通过task结构体的list_item得到task结构体的指针的功能
extern xTaskHandler_t* getTaskFromListItem(xListItem_t* item);
//下面的接口从list中获取一个item
extern xListItem_t* getItem(xList_t* list);

xTaskHandler_t* find_next_task(void) {
  xList_t* pTempList = &(xReadyList[uxCurrentMaxPriority]);
  return getTaskFromListItem(getItem(pTempList));
}

2. 上面的模型还存在一个问题,即只有当前正在运行的task主动要求调度时,才会进行调度。但实际使用中我们常常会在中断退出时触发一次调度,这样做的好处是防止高优先级的task长时间被低优先的task阻塞。注意,CR5中中断使用的是R13_irq, R14_irq,与正常运行时的R13_svc,R14_svc不同,因此切换时需要改变的是svc模式下的R13:

void irqHandler {
  save_context();							//压栈保存寄存器
  irqISR();									//中断的回调
  pxCurrentTask->r13 = get_svc_sp();		//将当前svc_r13(sp)寄存器值保存
  pxCurrentTask = find_next_task();			//找到需要调度的下一个task
  set_svc_sp(pxCurrentTask->r13);				//修改svc_r13寄存器的值
  /*从这里开始,进入到其他task的世界*/
  restore_context();						//退栈恢复现场
}

在中断退出时调度还有一个好处,即可以通过向target core发送一个中断(即核间通信中断,在SOC中十分场景)强迫其执行一次调度。

此外,在smp系统中,可以通过向其他core发送一个软件中断,使其他core进行调度。可以设想这样一个场景,core0正在执行一个优先级较低的task,core1则没有task执行,处于idle状态,此时core0收到外部中断,这个中断中唤起了一个优先级更高的task,出中断后core0转而执行优先级更高的task,那么它就可以给core1发送一个软中断,这样core1就会从idle状态唤醒转而执行原先在core0上执行的优先级较低的task。

延时的实现

延时函数

在前面的基础上,我们再来看看如何在RTOS的语境下实现延时。现在我们有一个timer,它会定时报中断给CPU,那么我们就可以维护一个全局变量,它记录了当前系统触发的定时中断的数量,每进一次定时中断都将这个变量加1,我们将这个变量称为系统节拍数。当一个task需要执行延时操作时,就可以使用当前系统节拍数+延时节拍数,得到一个唤醒节拍数。当系统节拍数达到唤醒节拍数时,出定时中断触发调度,就可能回到该task执行。注意,这里说可能,是因为触发调度后,有可能会切到其他优先级更高的task去,可以理解为切到了调用延时的task,紧接着被更高优先级的task抢占;另一方面,延时的精度还会受到定时中断周期限制。这两点结合可以看出,涉及到调度的延时并非绝对意义的准确。

参照前面ready链表的策略,我们也可以实现一个delay链表,所有执行了延时操作的task,我们都将它从ready链表中去掉并加入dealy链表,当延时到了后,再从delay链表中去掉该task加入ready链表。同时为了提升效率,修改一下xListItem_t这个结构体,给他添加一个变量value用于记录唤醒时间,这样在插入delay链表时可以按照唤醒先后的顺序,即按照value升序,使delay链表为一个有序链表。这样当延时到了要从delay链表中去掉task时,无需再遍历整条链表:

struct XLIST_ITEM {
  struct XLIST_ITEM *pxPrevious;
  struct XLIST_ITEM *pxNext;
  uint32 value;
};
typedef struct XLIST_ITEM xListItem_t;
typedef struct {
  uint32 itemNum;			//链表元素个数
  xListItem* pxFirstItem;	//链表首个元素
  xListItem* pxLastItem;	//链表末尾元素
}xList_t;

static xList_t xDelayList;		//delay链表
static xList_t xReadyList[MAX_PRIORITY];	//ready链表
xTaskHandler_t *pxCurrentTask = NULL;

xList_t* pxDelayList = &xDelayList;
static uint32 systemCount;		//系统节拍数

//按item->value升序将item插入链表
extern void vInsertInOrder(xList_t *list, xListItem_t* item);
//获取list中首个item并从链表中去掉
extern xListItem_t* vRemoveFirstItem(xList_t *list);
//将指定item并从链表中去掉
extern void vRemoveItem(xList_t *list, xListItem_t* item);
//设置item->value
extern xListItem_t* vSetItemValue(xListItem_t *item, uint32 value);
//将item加入list尾
extern void vInsertToEnd(xList_t *list, xListItem_t* item);
//下面的接口实现通过task结构体的list_item得到task结构体的指针的功能
extern xTaskHandler_t* getTaskFromListItem(xListItem_t* item);

void delay(uint32 count) {
  uint32 timeToWake = systemCount + count;					//唤醒时间
  vSetItemValue(&(pxCurrentTask->list_item), timeToWake);	//将唤醒时间写到list_item->value
  vRemoveItem(&(xReadyList[pxCurrentTask->priority]), &(pxCurrentTask->list_item));	//从ready队列中去掉
  vInsertInOrder(pxDelayList, &(pxCurrentTask->list_item));	//加入到delay链表
  yield();
}

void timerIRQ(void) {
  save_context();				//压栈保存寄存器
  systemCount++;				//系统节拍数+1
  if (systemCount == pxDelayList->pxFirstItem->value) {	//delay链表中首个元素为最先超时的
	xListItem_t* temp = vRemoveFirstItem(pxDelayList);	//从delay链表中去掉
	xList_t* pxReadyList = &(xReadyList[getTaskFromListItem(temp)->priority]);
	vInsertToEnd(pxReadyList, temp);	//加入ready链表
  }
  pxCurrentTask->r13 = get_svc_sp();		//将当前svc_r13(sp)寄存器值保存
  pxCurrentTask = find_next_task();			//找到需要调度的下一个task
  set_svc_sp(pxCurrentTask->r13);				//修改svc_r13寄存器的值
  /*从这里开始,进入到其他task的世界*/
  restore_context();						//退栈恢复现场
}

你可能注意到了,这样做有个问题,就是systemCount可能会溢出。为了解决这个问题,我们可以维护两条delay链表,具体实现不展开说了,有兴趣可以自己考虑一下。

时间片

如果上面的基础上,对find_next_task进行一定修改,可以实现时间片的功能。

//下面的接口实现通过task结构体的list_item得到task结构体的指针的功能
extern xTaskHandler_t* getTaskFromListItem(xListItem_t* item);
//下面的接口从list中获取一个不同于参数item的item
extern xListItem_t* getNewItem(xList_t* list, xListItem_t* item);

xTaskHandler_t* find_next_task(void) {
  xList_t* pTempList = &(xReadyList[uxCurrentMaxPriority]);
  return getTaskFromListItem(getNewItem(pTempList));
}

时间片可以用在同优先级的task之间,让同一优先级的task都有机会获得调度,但是有时间片的情况下,会导致SOC频繁被中断打断,系统没法长时间处于low power mode,功耗方面可能有些问题。

结语

本章主要概述了RTOS的基本框架,关于内核对象,如消息队列,互斥锁,信号量等后续会单独讨论。

#嵌入式工程师##嵌入式#
嵌入式学习 文章被收录于专栏

21届985微电子硕士,校招SSP进入紫光展锐任嵌入式工程师,8个月后跳槽至ASR工作至今。本专栏记录上学/工作时学到的一些知识,debug的一些工具使用等,希望有所帮助。

全部评论
马克
点赞 回复 分享
发布于 2023-06-05 22:58 四川
牛批
点赞 回复 分享
发布于 2023-04-23 23:32 广东

相关推荐

评论
3
28
分享

创作者周榜

更多
牛客网
牛客企业服务