ae.c

Redis的ae.c封装了多种事件驱动框架的使用。
如:evport、epoll、kqueue、select
ae.h中声明了需要用到的数据结构和一些接口方法。

目前Redis支持以下四种事件驱动框架,实现分别为:

  • ae_epoll.c
  • ae_evport.c
  • ae_kqueue.c
  • ae_select.c
//根据系统环境选择evport、epoll、kqueue、select
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
    #ifdef HAVE_EPOLL
    #include "ae_epoll.c"
    #else
        #ifdef HAVE_KQUEUE
        #include "ae_kqueue.c"
        #else
        #include "ae_select.c"
        #endif
    #endif
#endif

server.c中的main方法中有着这么一段代码。

    aeSetBeforeSleepProc(server.el,beforeSleep);
    aeSetAfterSleepProc(server.el,afterSleep);
    aeMain(server.el);
    aeDeleteEventLoop(server.el);

而在ae.h中EventLoop的数据结构则声明如下:

/* State of an event based program */
//事件驱动模型的EventLoop轮询器
typedef struct aeEventLoop {
    //当前最大的fd,在ae_select.c中用到
    int maxfd;   /* highest file descriptor currently registered */
    //事件队列、已触发事件队列的大小
    int setsize; /* max number of file descriptors tracked */
    long long timeEventNextId;
    time_t lastTime;     /* Used to detect system clock skew */
    //注册的关注的事件
    aeFileEvent *events; /* Registered events */
    //触发的事件
    aeFiredEvent *fired; /* Fired events */
    //定时事件链表的头结点
    aeTimeEvent *timeEventHead;
    //事件轮询器的运行标记状态
    int stop;
    void *apidata; /* This is used for polling API specific data */
    //调用aeApiPoll之前需要处理的任务
    aeBeforeSleepProc *beforesleep;
    //调用完aeApiPoll之后需要处理的任务
    aeBeforeSleepProc *aftersleep;
} aeEventLoop;

时间事件的数据结构如下:

//定时时间事件
typedef struct aeTimeEvent {
    long long id; /* time event identifier. */
    long when_sec; /* seconds */
    long when_ms; /* milliseconds */
    aeTimeProc *timeProc;
    aeEventFinalizerProc *finalizerProc;
    void *clientData;
    struct aeTimeEvent *prev;
    struct aeTimeEvent *next;
} aeTimeEvent;

文件事件的数据结构如下:

/* File event structure */
typedef struct aeFileEvent {
    int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
    aeFileProc *rfileProc;
    aeFileProc *wfileProc;
    void *clientData;
} aeFileEvent;

触发事件的数据结构如下:

//触发的事件
typedef struct aeFiredEvent {
    int fd; //返回的fd
    int mask; //关注的fd上发生的行为
} aeFiredEvent;

另外,在ae.c中有4个方法。

void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep) {
    eventLoop->beforesleep = beforesleep;
}

void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep) {
    eventLoop->aftersleep = aftersleep;
}

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
    }
}
void aeDeleteEventLoop(aeEventLoop *eventLoop) {
    aeApiFree(eventLoop);
    zfree(eventLoop->events);
    zfree(eventLoop->fired);
    zfree(eventLoop);
}

分别对应着server.c中那4行代码的主要实现。

也就是说,当server.c运行的时候,只要!eventLoop->stop标记返回true,就会轮询,调用eventLoop->beforesleep,然后去调用aeProcessEvents来监听事件,而aeEventLoop承载了所需要的参数和数据等,在aeProcessEvents中去进行pollaftersleep的调用。

接下来,看看aeProcessEvents的实现。

//事件处理
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;

    //判断当前flags是否为 时间事件或者文件事件,如果不是,立即返回。
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;

        //如果是时间事件且不需要立刻返回
        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            //查找最近到达的时间事件
            shortest = aeSearchNearestTimer(eventLoop);
        //如果不为空
        if (shortest) {
            long now_sec, now_ms;
            //获取当前时间
            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;

            //触发时间-当前时间,如果小于0,则立即触发。
            //轮询时间检测基本都是采用这种方式计算的。
            long long ms =
                (shortest->when_sec - now_sec)*1000 +
                shortest->when_ms - now_ms;

            if (ms > 0) {
                tvp->tv_sec = ms/1000;
                tvp->tv_usec = (ms % 1000)*1000;
            } else {
                tvp->tv_sec = 0;
                tvp->tv_usec = 0;
            }
        } else {
            /* If we have to check for events but need to return
             * ASAP because of AE_DONT_WAIT we need to set the timeout
             * to zero */
            //ASAP = As Soon As Possible 尽快
            //如果设置了AE_DONT_WAIT,那么应该立即返回,而不应该等待,且设置timeout为0.
            if (flags & AE_DONT_WAIT) {
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
                /* Otherwise we can block */
                tvp = NULL; /* wait forever */
            }
        }

        //调用多路复用API,如:epoll,返回timeout或者被触发的事件
        numevents = aeApiPoll(eventLoop, tvp);

        //aeApiPoll一返回,就立刻调用各种回调
        if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
            eventLoop->aftersleep(eventLoop);

        //处理文件事件
        for (j = 0; j < numevents; j++) {
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int fired = 0; /* Number of events fired for current fd. */

            // AE_BARRIER 在异步同步AOF到磁盘的场景中使用以及集群发送消息
            int invert = fe->mask & AE_BARRIER;

            //先执行读事件,通过invert来修改先读后写的顺序
            if (!invert && fe->mask & mask & AE_READABLE) {
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                fired++;
            }

            /* Fire the writable event. */
            //再执行写事件
            if (fe->mask & mask & AE_WRITABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

            /* If we have to invert the call, fire the readable event now
             * after the writable one. */
            //通过invert来修改先读后写的顺序
            if (invert && fe->mask & mask & AE_READABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

            processed++;
        }
    }
    /* Check time events */
    //处理时间事件
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);

    return processed; /* return the number of processed file/time events */
}

大致如下:
aeTimeEvent -> aeApiPoll -> aftersleep -> aeFileEvent -> processTimeEvents
aeApiPoll则由evport、epoll、kqueue、select各自实现。

ae_epoll.caeApiPoll的实现

调用epoll_wait来等待关注事件的到达。

//epoll事件触发
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;
    //epoll_wait会阻塞调用线程
    //等待epoll事件触发,可设置timeout,如果事件超过timeout没有响应,就超时。
    //返回就绪list中的数据,epoll采用链表,fd大小理论上是没有限制的。
    //不需要拷贝fd_set,在内核空间存在一个红黑树存储epoll_ctl传来的socket
    retval = epoll_wait(state->epfd, state->events, eventLoop->setsize,
                        tvp ? (tvp->tv_sec * 1000 + tvp->tv_usec / 1000) : -1);
    if (retval > 0) {
        int j;
        numevents = retval;
        //返回的事件都是可以被处理的
        for (j = 0; j < numevents; j++) {
            int mask = 0;
            struct epoll_event *e = state->events + j;
            if (e->events & EPOLLIN) mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) mask |= AE_WRITABLE;
            if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
            //已完成的事件
            eventLoop->fired[j].fd = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }
    }
    //返回事件数,在aeProcessEvents中再for循环处理
    return numevents;
}

ae_select.caeApiPoll的实现

epoll的一个区别则是,epoll调用epoll_wait返回的事件都可以直接返回,而select需要判断每一个事件是否可以被返回。
当每一个连接都很活跃的时候,select的效率可能就比epoll高了。

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, j, numevents = 0;
    
    //将fd_set从用户空间拷贝到内核空间
    memcpy(&state->_rfds,&state->rfds,sizeof(fd_set));
    memcpy(&state->_wfds,&state->wfds,sizeof(fd_set));

    //调用select,有fd大小限制,一般是1024
    retval = select(eventLoop->maxfd+1,
                &state->_rfds,&state->_wfds,NULL,tvp);
    if (retval > 0) {
        //循环遍历每一个fd,再进行判断处理,得到可以被返回的事件
        for (j = 0; j <= eventLoop->maxfd; j++) {
            int mask = 0;
            aeFileEvent *fe = &eventLoop->events[j];

            if (fe->mask == AE_NONE) continue;
            if (fe->mask & AE_READABLE && FD_ISSET(j,&state->_rfds))
                mask |= AE_READABLE;
            if (fe->mask & AE_WRITABLE && FD_ISSET(j,&state->_wfds))
                mask |= AE_WRITABLE;
            eventLoop->fired[numevents].fd = j;
            eventLoop->fired[numevents].mask = mask;
            numevents++;
        }
    }
    return numevents;
}

ae_kqueue.caeApiPoll的实现

kqueueepoll有点相似,API方面只用kevent就包含了epoll具备的功能。

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;

    if (tvp != NULL) {
        struct timespec timeout;
        timeout.tv_sec = tvp->tv_sec;
        timeout.tv_nsec = tvp->tv_usec * 1000;
        //带有超时时间的kevent
        retval = kevent(state->kqfd, NULL, 0, state->events, eventLoop->setsize,
                        &timeout);
    } else {
        //kevent功能类似epoll_wait + epoll_ctl 
        retval = kevent(state->kqfd, NULL, 0, state->events, eventLoop->setsize,
                        NULL);
    }

    if (retval > 0) {
        int j;
        numevents = retval;
        //kqueue本质上类似epoll,直接返回可处理的事件
        for(j = 0; j < numevents; j++) {
            int mask = 0;
            struct kevent *e = state->events+j;
            if (e->filter == EVFILT_READ) mask |= AE_READABLE;
            if (e->filter == EVFILT_WRITE) mask |= AE_WRITABLE;
            eventLoop->fired[j].fd = e->ident;
            eventLoop->fired[j].mask = mask;
        }
    }
    return numevents;
}

ae_evport.caeApiPoll的实现

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    struct timespec timeout, *tsp;
    int mask, i;
    uint_t nevents;
    port_event_t event[MAX_EVENT_BATCHSZ];

    /*
     * If we've returned fd events before, we must re-associate them with the
     * port now, before calling port_get().  See the block comment at the top of
     * this file for an explanation of why.
     */
    for (i = 0; i < state->npending; i++) {
        if (state->pending_fds[i] == -1)
            /* This fd has since been deleted. */
            continue;

        if (aeApiAssociate("aeApiPoll", state->portfd,
            state->pending_fds[i], state->pending_masks[i]) != 0) {
            /* See aeApiDelEvent for why this case is fatal. */
            abort();
        }

        state->pending_masks[i] = AE_NONE;
        state->pending_fds[i] = -1;
    }

    state->npending = 0;

    if (tvp != NULL) {
        timeout.tv_sec = tvp->tv_sec;
        timeout.tv_nsec = tvp->tv_usec * 1000;
        tsp = &timeout;
    } else {
        tsp = NULL;
    }

    /*
     * port_getn can return with errno == ETIME having returned some events (!).
     * So if we get ETIME, we check nevents, too.
     */
    nevents = 1;
    
    //通过port_getn获取事件数
    if (port_getn(state->portfd, event, MAX_EVENT_BATCHSZ, &nevents,
        tsp) == -1 && (errno != ETIME || nevents == 0)) {
        if (errno == ETIME || errno == EINTR)
            return 0;

        /* Any other error indicates a bug. */
        perror("aeApiPoll: port_get");
        abort();
    }

    state->npending = nevents;

    for (i = 0; i < nevents; i++) {
            mask = 0;
            if (event[i].portev_events & POLLIN)
                mask |= AE_READABLE;
            if (event[i].portev_events & POLLOUT)
                mask |= AE_WRITABLE;

            eventLoop->fired[i].fd = event[i].portev_object;
            eventLoop->fired[i].mask = mask;

            if (evport_debug)
                fprintf(stderr, "aeApiPoll: fd %d mask 0x%x\n",
                    (int)event[i].portev_object, mask);

            state->pending_fds[i] = event[i].portev_object;
            state->pending_masks[i] = (uintptr_t)event[i].portev_user;
    }

    return nevents;
}
全部评论

相关推荐

不愿透露姓名的神秘牛友
11-26 15:46
已编辑
字节国际 电商后端 24k-35k
点赞 评论 收藏
分享
牛客279957775号:铁暗恋
点赞 评论 收藏
分享
评论
点赞
收藏
分享
牛客网
牛客企业服务