ae.c
Redis的ae.c
封装了多种事件驱动框架
的使用。
如:evport、epoll、kqueue、select
。
在ae.h
中声明了需要用到的数据结构和一些接口方法。
目前Redis支持以下四种事件驱动框架,实现分别为:
- ae_epoll.c
- ae_evport.c
- ae_kqueue.c
- ae_select.c
//根据系统环境选择evport、epoll、kqueue、select
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
#ifdef HAVE_EPOLL
#include "ae_epoll.c"
#else
#ifdef HAVE_KQUEUE
#include "ae_kqueue.c"
#else
#include "ae_select.c"
#endif
#endif
#endif
在server.c
中的main
方法中有着这么一段代码。
aeSetBeforeSleepProc(server.el,beforeSleep);
aeSetAfterSleepProc(server.el,afterSleep);
aeMain(server.el);
aeDeleteEventLoop(server.el);
而在ae.h
中EventLoop的数据结构则声明如下:
/* State of an event based program */
//事件驱动模型的EventLoop轮询器
typedef struct aeEventLoop {
//当前最大的fd,在ae_select.c中用到
int maxfd; /* highest file descriptor currently registered */
//事件队列、已触发事件队列的大小
int setsize; /* max number of file descriptors tracked */
long long timeEventNextId;
time_t lastTime; /* Used to detect system clock skew */
//注册的关注的事件
aeFileEvent *events; /* Registered events */
//触发的事件
aeFiredEvent *fired; /* Fired events */
//定时事件链表的头结点
aeTimeEvent *timeEventHead;
//事件轮询器的运行标记状态
int stop;
void *apidata; /* This is used for polling API specific data */
//调用aeApiPoll之前需要处理的任务
aeBeforeSleepProc *beforesleep;
//调用完aeApiPoll之后需要处理的任务
aeBeforeSleepProc *aftersleep;
} aeEventLoop;
时间事件的数据结构如下:
//定时时间事件
typedef struct aeTimeEvent {
long long id; /* time event identifier. */
long when_sec; /* seconds */
long when_ms; /* milliseconds */
aeTimeProc *timeProc;
aeEventFinalizerProc *finalizerProc;
void *clientData;
struct aeTimeEvent *prev;
struct aeTimeEvent *next;
} aeTimeEvent;
文件事件的数据结构如下:
/* File event structure */
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
aeFileProc *rfileProc;
aeFileProc *wfileProc;
void *clientData;
} aeFileEvent;
触发事件的数据结构如下:
//触发的事件
typedef struct aeFiredEvent {
int fd; //返回的fd
int mask; //关注的fd上发生的行为
} aeFiredEvent;
另外,在ae.c
中有4个方法。
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep) {
eventLoop->beforesleep = beforesleep;
}
void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep) {
eventLoop->aftersleep = aftersleep;
}
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
}
}
void aeDeleteEventLoop(aeEventLoop *eventLoop) {
aeApiFree(eventLoop);
zfree(eventLoop->events);
zfree(eventLoop->fired);
zfree(eventLoop);
}
分别对应着server.c
中那4行代码的主要实现。
也就是说,当server.c
运行的时候,只要!eventLoop->stop
标记返回true
,就会轮询,调用eventLoop->beforesleep
,然后去调用aeProcessEvents
来监听事件,而aeEventLoop
承载了所需要的参数和数据等,在aeProcessEvents
中去进行poll
和aftersleep
的调用。
接下来,看看aeProcessEvents
的实现。
//事件处理
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;
//判断当前flags是否为 时间事件或者文件事件,如果不是,立即返回。
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp;
//如果是时间事件且不需要立刻返回
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
//查找最近到达的时间事件
shortest = aeSearchNearestTimer(eventLoop);
//如果不为空
if (shortest) {
long now_sec, now_ms;
//获取当前时间
aeGetTime(&now_sec, &now_ms);
tvp = &tv;
//触发时间-当前时间,如果小于0,则立即触发。
//轮询时间检测基本都是采用这种方式计算的。
long long ms =
(shortest->when_sec - now_sec)*1000 +
shortest->when_ms - now_ms;
if (ms > 0) {
tvp->tv_sec = ms/1000;
tvp->tv_usec = (ms % 1000)*1000;
} else {
tvp->tv_sec = 0;
tvp->tv_usec = 0;
}
} else {
/* If we have to check for events but need to return
* ASAP because of AE_DONT_WAIT we need to set the timeout
* to zero */
//ASAP = As Soon As Possible 尽快
//如果设置了AE_DONT_WAIT,那么应该立即返回,而不应该等待,且设置timeout为0.
if (flags & AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else {
/* Otherwise we can block */
tvp = NULL; /* wait forever */
}
}
//调用多路复用API,如:epoll,返回timeout或者被触发的事件
numevents = aeApiPoll(eventLoop, tvp);
//aeApiPoll一返回,就立刻调用各种回调
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
eventLoop->aftersleep(eventLoop);
//处理文件事件
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int fired = 0; /* Number of events fired for current fd. */
// AE_BARRIER 在异步同步AOF到磁盘的场景中使用以及集群发送消息
int invert = fe->mask & AE_BARRIER;
//先执行读事件,通过invert来修改先读后写的顺序
if (!invert && fe->mask & mask & AE_READABLE) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
/* Fire the writable event. */
//再执行写事件
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
/* If we have to invert the call, fire the readable event now
* after the writable one. */
//通过invert来修改先读后写的顺序
if (invert && fe->mask & mask & AE_READABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
processed++;
}
}
/* Check time events */
//处理时间事件
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */
}
大致如下:
aeTimeEvent -> aeApiPoll -> aftersleep -> aeFileEvent -> processTimeEvents
aeApiPoll
则由evport、epoll、kqueue、select
各自实现。
ae_epoll.c
对aeApiPoll
的实现
调用epoll_wait
来等待关注事件的到达。
//epoll事件触发
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
//epoll_wait会阻塞调用线程
//等待epoll事件触发,可设置timeout,如果事件超过timeout没有响应,就超时。
//返回就绪list中的数据,epoll采用链表,fd大小理论上是没有限制的。
//不需要拷贝fd_set,在内核空间存在一个红黑树存储epoll_ctl传来的socket
retval = epoll_wait(state->epfd, state->events, eventLoop->setsize,
tvp ? (tvp->tv_sec * 1000 + tvp->tv_usec / 1000) : -1);
if (retval > 0) {
int j;
numevents = retval;
//返回的事件都是可以被处理的
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events + j;
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
//已完成的事件
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
}
//返回事件数,在aeProcessEvents中再for循环处理
return numevents;
}
ae_select.c
对aeApiPoll
的实现
和epoll
的一个区别则是,epoll
调用epoll_wait
返回的事件都可以直接返回,而select
需要判断每一个事件是否可以被返回。
当每一个连接都很活跃的时候,select
的效率可能就比epoll
高了。
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, j, numevents = 0;
//将fd_set从用户空间拷贝到内核空间
memcpy(&state->_rfds,&state->rfds,sizeof(fd_set));
memcpy(&state->_wfds,&state->wfds,sizeof(fd_set));
//调用select,有fd大小限制,一般是1024
retval = select(eventLoop->maxfd+1,
&state->_rfds,&state->_wfds,NULL,tvp);
if (retval > 0) {
//循环遍历每一个fd,再进行判断处理,得到可以被返回的事件
for (j = 0; j <= eventLoop->maxfd; j++) {
int mask = 0;
aeFileEvent *fe = &eventLoop->events[j];
if (fe->mask == AE_NONE) continue;
if (fe->mask & AE_READABLE && FD_ISSET(j,&state->_rfds))
mask |= AE_READABLE;
if (fe->mask & AE_WRITABLE && FD_ISSET(j,&state->_wfds))
mask |= AE_WRITABLE;
eventLoop->fired[numevents].fd = j;
eventLoop->fired[numevents].mask = mask;
numevents++;
}
}
return numevents;
}
ae_kqueue.c
对aeApiPoll
的实现
kqueue
和epoll
有点相似,API方面只用kevent
就包含了epoll
具备的功能。
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
if (tvp != NULL) {
struct timespec timeout;
timeout.tv_sec = tvp->tv_sec;
timeout.tv_nsec = tvp->tv_usec * 1000;
//带有超时时间的kevent
retval = kevent(state->kqfd, NULL, 0, state->events, eventLoop->setsize,
&timeout);
} else {
//kevent功能类似epoll_wait + epoll_ctl
retval = kevent(state->kqfd, NULL, 0, state->events, eventLoop->setsize,
NULL);
}
if (retval > 0) {
int j;
numevents = retval;
//kqueue本质上类似epoll,直接返回可处理的事件
for(j = 0; j < numevents; j++) {
int mask = 0;
struct kevent *e = state->events+j;
if (e->filter == EVFILT_READ) mask |= AE_READABLE;
if (e->filter == EVFILT_WRITE) mask |= AE_WRITABLE;
eventLoop->fired[j].fd = e->ident;
eventLoop->fired[j].mask = mask;
}
}
return numevents;
}
ae_evport.c
对aeApiPoll
的实现
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
struct timespec timeout, *tsp;
int mask, i;
uint_t nevents;
port_event_t event[MAX_EVENT_BATCHSZ];
/*
* If we've returned fd events before, we must re-associate them with the
* port now, before calling port_get(). See the block comment at the top of
* this file for an explanation of why.
*/
for (i = 0; i < state->npending; i++) {
if (state->pending_fds[i] == -1)
/* This fd has since been deleted. */
continue;
if (aeApiAssociate("aeApiPoll", state->portfd,
state->pending_fds[i], state->pending_masks[i]) != 0) {
/* See aeApiDelEvent for why this case is fatal. */
abort();
}
state->pending_masks[i] = AE_NONE;
state->pending_fds[i] = -1;
}
state->npending = 0;
if (tvp != NULL) {
timeout.tv_sec = tvp->tv_sec;
timeout.tv_nsec = tvp->tv_usec * 1000;
tsp = &timeout;
} else {
tsp = NULL;
}
/*
* port_getn can return with errno == ETIME having returned some events (!).
* So if we get ETIME, we check nevents, too.
*/
nevents = 1;
//通过port_getn获取事件数
if (port_getn(state->portfd, event, MAX_EVENT_BATCHSZ, &nevents,
tsp) == -1 && (errno != ETIME || nevents == 0)) {
if (errno == ETIME || errno == EINTR)
return 0;
/* Any other error indicates a bug. */
perror("aeApiPoll: port_get");
abort();
}
state->npending = nevents;
for (i = 0; i < nevents; i++) {
mask = 0;
if (event[i].portev_events & POLLIN)
mask |= AE_READABLE;
if (event[i].portev_events & POLLOUT)
mask |= AE_WRITABLE;
eventLoop->fired[i].fd = event[i].portev_object;
eventLoop->fired[i].mask = mask;
if (evport_debug)
fprintf(stderr, "aeApiPoll: fd %d mask 0x%x\n",
(int)event[i].portev_object, mask);
state->pending_fds[i] = event[i].portev_object;
state->pending_masks[i] = (uintptr_t)event[i].portev_user;
}
return nevents;
}