redis7.x源码分析:(5) ae事件处理器(二)

​ 接上篇,接下来看定时器事件的处理。

创建定时器事件:

long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
        aeTimeProc *proc, void *clientData,
        aeEventFinalizerProc *finalizerProc)
{
    // 通过累加timeEventNextId生成新id
    long long id = eventLoop->timeEventNextId++;
    aeTimeEvent *te;

    te = zmalloc(sizeof(*te));
    if (te == NULL) return AE_ERR;
    te->id = id;
    // 设置触发时间为:当前单调时钟 + 需要定时时长,单位是微妙
    te->when = getMonotonicUs() + milliseconds * 1000;
    te->timeProc = proc;
    te->finalizerProc = finalizerProc;
    te->clientData = clientData;
    te->prev = NULL;
    te->next = eventLoop->timeEventHead;
    te->refcount = 0;
    if (te->next)
        te->next->prev = te;
    // 新加入的时间直接添加到链表头
    eventLoop->timeEventHead = te;
    return id;
}

Redis的定时器实现比较简单,它并未使用最小堆或者时间轮之类的方式来管理定时事件,而是直接使用一个双向链表把所有的定时器事件串起来,然后通过遍历去实现一些功能。这么做主要是因为Redis中定时器使用量非常少,不需要管理大量的定时事件,所以就不需要排序;另外没有频繁注册删除的操作,这样也就不需要快速查找。

删除定时器事件:

int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id)
{
    aeTimeEvent *te = eventLoop->timeEventHead;
    while(te) {
        if (te->id == id) {
            // 对于需要删除的定时器只把它的id置为无效,等待下一轮消息循环 processTimeEvents 中再做处理
            te->id = AE_DELETED_EVENT_ID;
            return AE_OK;
        }
        te = te->next;
    }
    return AE_ERR; /* NO event with the specified ID found */
}

对于定时器的删除操作不会立即执行,而是将定时器修改为无效,在下一轮的 processTimeEvents 中处理。

获取最早超时时间:

static int64_t usUntilEarliestTimer(aeEventLoop *eventLoop) {
    aeTimeEvent *te = eventLoop->timeEventHead;
    if (te == NULL) return -1;

    aeTimeEvent *earliest = NULL;
    // 遍历链表找到最早触发的定时器事件
    while (te) {
        if (!earliest || te->when < earliest->when)
            earliest = te;
        te = te->next;
    }

    monotime now = getMonotonicUs();
    // 返回最早触发的定时器距当前时间的剩余时长
    return (now >= earliest->when) ? 0 : earliest->when - now;
}

从代码中可以看出,最早超时时间就是通过遍历链表找出超时时间when最小的事件来实现的。aeProcessEvents中会调用它获取下个期望的超时时间,然后传递给 aeApiPoll 进行超时等待。

处理定时器事件:

/* Process time events */
static int processTimeEvents(aeEventLoop *eventLoop) {
    int processed = 0;
    aeTimeEvent *te;
    long long maxId;

    te = eventLoop->timeEventHead;
    // 实际使用的最大id是 timeEventNextId-1
    maxId = eventLoop->timeEventNextId-1;
    monotime now = getMonotonicUs();
    while(te) {
        long long id;

        /* Remove events scheduled for deletion. */
        if (te->id == AE_DELETED_EVENT_ID) {
            // 清理失效的定时器
            aeTimeEvent *next = te->next;
            /* If a reference exists for this timer event,
             * don't free it. This is currently incremented
             * for recursive timerProc calls */
            // 判断下引用计数,还在被引用的直接跳过
            if (te->refcount) {
                te = next;
                continue;
            }
            // 从链表中删除定时器节点
            if (te->prev)
                te->prev->next = te->next;
            else
                eventLoop->timeEventHead = te->next;
            if (te->next)
                te->next->prev = te->prev;
            if (te->finalizerProc) {
                te->finalizerProc(eventLoop, te->clientData);
                // 提高定时器执行精度,执行回调后更新now
                now = getMonotonicUs();
            }
            zfree(te);
            te = next;
            continue;
        }

        /* Make sure we don't process time events created by time events in
         * this iteration. Note that this check is currently useless: we always
         * add new timers on the head, however if we change the implementation
         * detail, this check may be useful again: we keep it here for future
         * defense. */
        if (te->id > maxId) {
            te = te->next;
            continue;
        }

        // 超时了
        if (te->when <= now) {
            int retval;

            id = te->id;
            te->refcount++;
            // 执行注册的定时器回调,并返回新的定时时间
            retval = te->timeProc(eventLoop, id, te->clientData);
            te->refcount--;
            processed++;
            // 提高定时器执行精度,执行回调后更新now
            now = getMonotonicUs();
            // 回调执行完后,循环定时器就更新时间点,否则就标记成删除等待清理
            if (retval != AE_NOMORE) {
                te->when = now + retval * 1000;
            } else {
                te->id = AE_DELETED_EVENT_ID;
            }
        }
        te = te->next;
    }
    return processed;
}

它是在 aeProcessEvents 的函数末尾调用的,通过遍历链表确认是否有事件超时,然后执行相应的超时回调。

#redis#
全部评论

相关推荐

点赞 收藏 评论
分享
牛客网
牛客企业服务