redis7.x源码分析:(4) ae事件处理器(一)
ae模块是redis实现的Reactor模型的封装。它的主要代码实现集中在 ae.c 中,另外还提供了平台相关的io多路复用的封装,它们都实现了一套相同的poll接口,就类似于C++中提供了一个接口基类,由针对不同平台的派生类去实现。
// 创建平台相关的io模型实例 static int aeApiCreate(aeEventLoop *eventLoop) // 修改可侦听的fd数量 static int aeApiResize(aeEventLoop *eventLoop, int setsize) // 释放实例 static void aeApiFree(aeEventLoop *eventLoop) // 添加或修改fd的侦听事件 static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) // 删除或修改fd的侦听事件 static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) // 侦听所有fd的事件 static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) // 返回使用的io模型名称 static char *aeApiName(void)
实际使用的io模型会根据编译时定义的宏在 ae.c 的代码头部直接引入。
/* Include the best multiplexing layer supported by this system. * The following should be ordered by performances, descending. */ // 根据不同操作系统下不同的宏选择应该使用的io复用模型 #ifdef HAVE_EVPORT #include "ae_evport.c" #else #ifdef HAVE_EPOLL // 对应linux #include "ae_epoll.c" #else #ifdef HAVE_KQUEUE // 对应BSD(FreeBSD、MacOS等) #include "ae_kqueue.c" #else // 对应Solaris #include "ae_select.c" #endif #endif #endif
以linux为例先来看一下ae_epoll.c的实现,它的代码也比较简单:
static int aeApiCreate(aeEventLoop *eventLoop) { aeApiState *state = zmalloc(sizeof(aeApiState)); if (!state) return -1; // 分配events数组大小 state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize); if (!state->events) { zfree(state); return -1; } // 创建epoll, kernal 2.6.8 之后参数被忽略了 state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */ if (state->epfd == -1) { zfree(state->events); zfree(state); return -1; } anetCloexec(state->epfd); eventLoop->apidata = state; return 0; } ...... static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) { aeApiState *state = eventLoop->apidata; struct epoll_event ee = {0}; /* avoid valgrind warning */ /* If the fd was already monitored for some event, we need a MOD * operation. Otherwise we need an ADD operation. */ // 没注册过, 认为是添加fd到epoll int op = eventLoop->events[fd].mask == AE_NONE ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; ee.events = 0; // 合并新老事件 mask |= eventLoop->events[fd].mask; /* Merge old events */ // 设置需要的事件 if (mask & AE_READABLE) ee.events |= EPOLLIN; if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; ee.data.fd = fd; if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1; return 0; } static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) { aeApiState *state = eventLoop->apidata; struct epoll_event ee = {0}; /* avoid valgrind warning */ // 删除指定事件 int mask = eventLoop->events[fd].mask & (~delmask); ee.events = 0; // 设置需要的事件 if (mask & AE_READABLE) ee.events |= EPOLLIN; if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; ee.data.fd = fd; // 没有事件执行删除,否则执行修改 if (mask != AE_NONE) { epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee); } else { /* Note, Kernel < 2.6.9 requires a non null event pointer even for * EPOLL_CTL_DEL. */ epoll_ctl(state->epfd,EPOLL_CTL_DEL,fd,&ee); } } static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { aeApiState *state = eventLoop->apidata; int retval, numevents = 0; // 此处的 (tvp->tv_usec + 999)/1000 是为了避免当 tvp->tv_usec < 1000us 时转换成毫秒为 0ms, 导致epoll_wait不会等待 // 因此这么写之后能保证只要 tvp->tv_usec 不为0,那么至少都能等待 1ms 时长 retval = epoll_wait(state->epfd,state->events,eventLoop->setsize, tvp ? (tvp->tv_sec*1000 + (tvp->tv_usec + 999)/1000) : -1); if (retval > 0) { int j; numevents = retval; for (j = 0; j < numevents; j++) { int mask = 0; struct epoll_event *e = state->events+j; if (e->events & EPOLLIN) mask |= AE_READABLE; if (e->events & EPOLLOUT) mask |= AE_WRITABLE; if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE; if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE; // 保存触发的事件 eventLoop->fired[j].fd = e->data.fd; eventLoop->fired[j].mask = mask; } } else if (retval == -1 && errno != EINTR) { panic("aeApiPoll: epoll_wait, %s", strerror(errno)); } return numevents; }
接下来再看一下ae相关接口,先看下fd相关实现。
主要结构体定义如下:
/* File event structure */ typedef struct aeFileEvent { int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */ // 事件类型 aeFileProc *rfileProc; // 读事件回调 aeFileProc *wfileProc; // 写事件回调 void *clientData; // 上下文指针 } aeFileEvent; /* Time event structure */ typedef struct aeTimeEvent { long long id; /* time event identifier. */ // 定时器事件id monotime when; // 触发时间 aeTimeProc *timeProc; // 定时器触发回调 aeEventFinalizerProc *finalizerProc; // 定时器销毁时执行的回调 void *clientData; // 上下文指针 struct aeTimeEvent *prev; // 前一个定时器事件 struct aeTimeEvent *next; // 后一个定时器事件 int refcount; /* refcount to prevent timer events from being * freed in recursive time event calls. */ // 引用计数 } aeTimeEvent; /* A fired event */ typedef struct aeFiredEvent { int fd; // 触发事件的定时器 int mask; // 触发的事件类型 } aeFiredEvent; /* State of an event based program */ typedef struct aeEventLoop { int maxfd; /* highest file descriptor currently registered */ // 注册的最大fd值 int setsize; /* max number of file descriptors tracked */ // 注册的fd数量 long long timeEventNextId; // 自增用于生成定时器id aeFileEvent *events; /* Registered events */ // 注册的fd事件 aeFiredEvent *fired; /* Fired events */ // 触发的fd事件 aeTimeEvent *timeEventHead; // 定时器事件链表头 int stop; // 结束标志 void *apidata; /* This is used for polling API specific data */ // 不同poll的上下文信息 aeBeforeSleepProc *beforesleep; // poll前调用 aeBeforeSleepProc *aftersleep; // poll后调用 int flags; } aeEventLoop;
aeEventLoop中的events是用于保存fd注册事件的数组,它是以fd值作为索引来存取事件的。
创建eventloop事件管理器:
aeEventLoop *aeCreateEventLoop(int setsize) { aeEventLoop *eventLoop; int i; // 初始化一下单调时钟使用哪种方式实现 monotonicInit(); /* just in case the calling app didn't initialize */ if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err; // events、fired数组以fd值为下标,直接存取对应fd的相关数据 eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize); eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize); if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err; eventLoop->setsize = setsize; eventLoop->timeEventHead = NULL; eventLoop->timeEventNextId = 0; eventLoop->stop = 0; eventLoop->maxfd = -1; eventLoop->beforesleep = NULL; eventLoop->aftersleep = NULL; eventLoop->flags = 0; // 根据平台创建io模型实例 if (aeApiCreate(eventLoop) == -1) goto err; /* Events with mask == AE_NONE are not set. So let's initialize the * vector with it. */ for (i = 0; i < setsize; i++) eventLoop->events[i].mask = AE_NONE; return eventLoop; err: if (eventLoop) { zfree(eventLoop->events); zfree(eventLoop->fired); zfree(eventLoop); } return NULL; }
添加和删除事件:
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData) { // 超出events数组有效范围直接报错 if (fd >= eventLoop->setsize) { errno = ERANGE; return AE_ERR; } aeFileEvent *fe = &eventLoop->events[fd]; // 添加或修改fd读写事件 if (aeApiAddEvent(eventLoop, fd, mask) == -1) return AE_ERR; fe->mask |= mask; if (mask & AE_READABLE) fe->rfileProc = proc; if (mask & AE_WRITABLE) fe->wfileProc = proc; fe->clientData = clientData; if (fd > eventLoop->maxfd) eventLoop->maxfd = fd; return AE_OK; } void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask) { if (fd >= eventLoop->setsize) return; aeFileEvent *fe = &eventLoop->events[fd]; if (fe->mask == AE_NONE) return; /* We want to always remove AE_BARRIER if set when AE_WRITABLE * is removed. */ if (mask & AE_WRITABLE) mask |= AE_BARRIER; aeApiDelEvent(eventLoop, fd, mask); fe->mask = fe->mask & (~mask); // 如果本次删除事件的fd是最大的fd,并且该fd已经没有事件了(可以认为该fd被清除了) if (fd == eventLoop->maxfd && fe->mask == AE_NONE) { /* Update the max fd */ int j; // 从后往前遍历找到第一个有事件的fd赋给maxfd for (j = eventLoop->maxfd-1; j >= 0; j--) if (eventLoop->events[j].mask != AE_NONE) break; eventLoop->maxfd = j; } }
事件处理函数,处理一轮事件:
int aeProcessEvents(aeEventLoop *eventLoop, int flags) { int processed = 0, numevents; /* Nothing to do? return ASAP */ if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0; /* Note that we want to call select() even if there are no * file events to process as long as we want to process time * events, in order to sleep until the next time event is ready * to fire. */ if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { int j; struct timeval tv, *tvp; int64_t usUntilTimer = -1; // 获取最近的一个定时器距当前时间的触发时长 if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) usUntilTimer = usUntilEarliestTimer(eventLoop); if (usUntilTimer >= 0) { tv.tv_sec = usUntilTimer / 1000000; tv.tv_usec = usUntilTimer % 1000000; tvp = &tv; } else { /* If we have to check for events but need to return * ASAP because of AE_DONT_WAIT we need to set the timeout * to zero */ if (flags & AE_DONT_WAIT) { tv.tv_sec = tv.tv_usec = 0; tvp = &tv; } else { /* Otherwise we can block */ tvp = NULL; /* wait forever */ } } // 不需要等待,超时时间设为0 if (eventLoop->flags & AE_DONT_WAIT) { tv.tv_sec = tv.tv_usec = 0; tvp = &tv; } // 执行poll前的回调(对fd读写以及命令处理都会在beforsleep中执行, 如果开启多线程读写也会在它内部一并处理) if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP) eventLoop->beforesleep(eventLoop); /* Call the multiplexing API, will return only on timeout or when * some event fires. */ // 等待事件 numevents = aeApiPoll(eventLoop, tvp); /* After sleep callback. */ // 执行poll后的回调 if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) eventLoop->aftersleep(eventLoop); // 处理poll到的读写事件 for (j = 0; j < numevents; j++) { int fd = eventLoop->fired[j].fd; aeFileEvent *fe = &eventLoop->events[fd]; int mask = eventLoop->fired[j].mask; int fired = 0; /* Number of events fired for current fd. */ /* Normally we execute the readable event first, and the writable * event later. This is useful as sometimes we may be able * to serve the reply of a query immediately after processing the * query. * * However if AE_BARRIER is set in the mask, our application is * asking us to do the reverse: never fire the writable event * after the readable. In such a case, we invert the calls. * This is useful when, for instance, we want to do things * in the beforeSleep() hook, like fsyncing a file to disk, * before replying to a client. */ // 是否反转读写事件的顺序(不设置 AE_BARRIER 时,先读后写) int invert = fe->mask & AE_BARRIER; /* Note the "fe->mask & mask & ..." code: maybe an already * processed event removed an element that fired and we still * didn't processed, so we check if the event is still valid. * * Fire the readable event if the call sequence is not * inverted. */ if (!invert && fe->mask & mask & AE_READABLE) { // 处理读事件回调 fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; // 防止在事件处理函数中 resize 过数组大小,所以需要重新获取下地址 fe = &eventLoop->events[fd]; /* Refresh in case of resize. */ } /* Fire the writable event. */ if (fe->mask & mask & AE_WRITABLE) { // 处理写事件回调(读写回调函数相同时,每轮poll只执行一次读或者写回调) if (!fired || fe->wfileProc != fe->rfileProc) { fe->wfileProc(eventLoop,fd,fe->clientData,mask); fired++; } } /* If we have to invert the call, fire the readable event now * after the writable one. */ if (invert) { // 反转读写时先写,然后在这边后读(读写回调函数相同时,每轮poll只执行一次读或者写回调) fe = &eventLoop->events[fd]; /* Refresh in case of resize. */ if ((fe->mask & mask & AE_READABLE) && (!fired || fe->wfileProc != fe->rfileProc)) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; } } processed++; } } /* Check time events */ // 处理定时器事件 if (flags & AE_TIME_EVENTS) processed += processTimeEvents(eventLoop); return processed; /* return the number of processed file/time events */ }
执行主事件循环:
void aeMain(aeEventLoop *eventLoop) { // 启动事件循环 eventLoop->stop = 0; while (!eventLoop->stop) { // 处理事件 aeProcessEvents(eventLoop, AE_ALL_EVENTS| AE_CALL_BEFORE_SLEEP| AE_CALL_AFTER_SLEEP); } }
#redis#