redis7.x源码分析:(5) ae事件处理器(二)
接上篇,接下来看定时器事件的处理。
创建定时器事件:
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds, aeTimeProc *proc, void *clientData, aeEventFinalizerProc *finalizerProc) { // 通过累加timeEventNextId生成新id long long id = eventLoop->timeEventNextId++; aeTimeEvent *te; te = zmalloc(sizeof(*te)); if (te == NULL) return AE_ERR; te->id = id; // 设置触发时间为:当前单调时钟 + 需要定时时长,单位是微妙 te->when = getMonotonicUs() + milliseconds * 1000; te->timeProc = proc; te->finalizerProc = finalizerProc; te->clientData = clientData; te->prev = NULL; te->next = eventLoop->timeEventHead; te->refcount = 0; if (te->next) te->next->prev = te; // 新加入的时间直接添加到链表头 eventLoop->timeEventHead = te; return id; }
Redis的定时器实现比较简单,它并未使用最小堆或者时间轮之类的方式来管理定时事件,而是直接使用一个双向链表把所有的定时器事件串起来,然后通过遍历去实现一些功能。这么做主要是因为Redis中定时器使用量非常少,不需要管理大量的定时事件,所以就不需要排序;另外没有频繁注册删除的操作,这样也就不需要快速查找。
删除定时器事件:
int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id) { aeTimeEvent *te = eventLoop->timeEventHead; while(te) { if (te->id == id) { // 对于需要删除的定时器只把它的id置为无效,等待下一轮消息循环 processTimeEvents 中再做处理 te->id = AE_DELETED_EVENT_ID; return AE_OK; } te = te->next; } return AE_ERR; /* NO event with the specified ID found */ }
对于定时器的删除操作不会立即执行,而是将定时器修改为无效,在下一轮的 processTimeEvents 中处理。
获取最早超时时间:
static int64_t usUntilEarliestTimer(aeEventLoop *eventLoop) { aeTimeEvent *te = eventLoop->timeEventHead; if (te == NULL) return -1; aeTimeEvent *earliest = NULL; // 遍历链表找到最早触发的定时器事件 while (te) { if (!earliest || te->when < earliest->when) earliest = te; te = te->next; } monotime now = getMonotonicUs(); // 返回最早触发的定时器距当前时间的剩余时长 return (now >= earliest->when) ? 0 : earliest->when - now; }
从代码中可以看出,最早超时时间就是通过遍历链表找出超时时间when最小的事件来实现的。aeProcessEvents中会调用它获取下个期望的超时时间,然后传递给 aeApiPoll 进行超时等待。
处理定时器事件:
/* Process time events */ static int processTimeEvents(aeEventLoop *eventLoop) { int processed = 0; aeTimeEvent *te; long long maxId; te = eventLoop->timeEventHead; // 实际使用的最大id是 timeEventNextId-1 maxId = eventLoop->timeEventNextId-1; monotime now = getMonotonicUs(); while(te) { long long id; /* Remove events scheduled for deletion. */ if (te->id == AE_DELETED_EVENT_ID) { // 清理失效的定时器 aeTimeEvent *next = te->next; /* If a reference exists for this timer event, * don't free it. This is currently incremented * for recursive timerProc calls */ // 判断下引用计数,还在被引用的直接跳过 if (te->refcount) { te = next; continue; } // 从链表中删除定时器节点 if (te->prev) te->prev->next = te->next; else eventLoop->timeEventHead = te->next; if (te->next) te->next->prev = te->prev; if (te->finalizerProc) { te->finalizerProc(eventLoop, te->clientData); // 提高定时器执行精度,执行回调后更新now now = getMonotonicUs(); } zfree(te); te = next; continue; } /* Make sure we don't process time events created by time events in * this iteration. Note that this check is currently useless: we always * add new timers on the head, however if we change the implementation * detail, this check may be useful again: we keep it here for future * defense. */ if (te->id > maxId) { te = te->next; continue; } // 超时了 if (te->when <= now) { int retval; id = te->id; te->refcount++; // 执行注册的定时器回调,并返回新的定时时间 retval = te->timeProc(eventLoop, id, te->clientData); te->refcount--; processed++; // 提高定时器执行精度,执行回调后更新now now = getMonotonicUs(); // 回调执行完后,循环定时器就更新时间点,否则就标记成删除等待清理 if (retval != AE_NOMORE) { te->when = now + retval * 1000; } else { te->id = AE_DELETED_EVENT_ID; } } te = te->next; } return processed; }
它是在 aeProcessEvents 的函数末尾调用的,通过遍历链表确认是否有事件超时,然后执行相应的超时回调。
#redis#