阅读redis(1)|逐步分析Redis建立监听服务的过程
导读: 这一期来讲解下redis建立监听服务的过程。
在网络编程的前两期,讲解了一个服务器的基本设计:
redis的服务器部分自然也符合上述两期的设计,并且以redis为例进行了部分源码分析。从本期开始,将开始全面分析redis源码之路。
- 欢迎点击并关注原文:阅读redis源码(1) | 逐步分析Redis建立监听服务的过程
- 点击并关注 look_code_art,更多硬核等你发现
从走近reactor设计模式,剖析高性能服务器框架核心一文中我们已经知道,高性能服务的核心在于怎么设计好事件循环eventloop结构。下面,就先来看看redis中表征eventloop的结构体 struct aeEventLoop。
struct aeEventLoop
struct aeEventLoop结构体如下。
typedef struct aeEventLoop {
int maxfd; /* 当前注册的fd最大数 */
int setsize; /* 能注册的最大fd数 */
long long timeEventNextId; /* 当前定时器事件最大id */
time_t lastTime; /* Used to detect system clock skew */
aeFileEvent* events; /* 数组,已经注册的事件, events 建立了一个映射关系:fd <--> event*/
aeFiredEvent* fired; /* 数组,已经触发的事件,记录的是触发的fd,及其事件类型 */
aeTimeEvent* timeEventHead; /* 定时器链表的头节点 */
int stop; /* EventLoop 是否停止 */
void* apidata; /* 这个用于存放和 epollfd 相关的数据 */
aeBeforeSleepProc *beforesleep; /* 在 epoll_wait 阻塞之前调用的函数 */
aeBeforeSleepProc *aftersleep; /* 在 epoll_wait 唤醒之后调用的函数 */
int flags; /* 设置的标志位 */
} aeEventLoop; 下面解释下其中几个字段。
events & fired
events 和 fired 字段,实际上都是数组,events 数组的下标表达的是fd,而对应的元素是该fd 关注的事件, fired 的下标仅仅是索引 index,该 index位置上的元素是记录了触发的fd 及其对应的事件类型。
数组,可以视为一种简单的map,不过数组是在下标索引和该位置的元素之间建立了映射关系。因此,events、fired字段,就是在fd/index和对应的事件之间建立映射关系。
其中,events中的元素类型是struct aeFileEvent,fired中元素的类型是struct aeFiredEvent,下面是完整的字段及其解释。
typedef struct aeFileEvent {
int mask; // 关注的事件:AE_NONE、AE_READABLE 、AE_WRITABLE、AE_BARRIER
aeFileProc *rfileProc; // 可读事件的回调函数
aeFileProc *wfileProc; // 可写事件的回调函数
void *clientData;
} aeFileEvent;
typedef struct aeFiredEvent {
int fd; // 有事件触发的文件描述符
int mask; // 这个fd触发具体的事件类型
} aeFiredEvent; timeEventHead
在redis中,是使用链表存储定时器任务,而timeEventHead字段,指向的是定时任务链表的头部节点。
由于每次定时任务都是在链表头部插入,因此 timeEventHead 记录的是最晚插入的节点(由于是直接插入,没有对定时器任务链表排序,因此头节点不一定是最早超时的任务)。
关于定时任务的设计,可以参考剖析服务器の定时器设计。
typedef struct aeTimeEvent {
long long id; /* 由 timeEventNextId++ 赋值,自增主键*/
long when_sec; /* seconds */
long when_ms; /* milliseconds */
aeTimeProc* timeProc; // 时间回调函数
aeEventFinalizerProc* finalizerProc;
void* clientData;
struct aeTimeEvent* prev;
struct aeTimeEvent* next;
int refcount;
} aeTimeEvent; aeCreateEventLoop
aeCreateEventLoop函数,创建aeEventLoop对象:
- 分配内存:先给
aeEventLoop对象eventloop分配内存,再给events、fired字段分配内存,且设置events[i].mask不关注任何事件; - 使用
aeApiCreate函数(这个函数在下面讲解),创建epollfd,为后文使用epoll_xxx函数准备; - 其他默认初始化
在aeCreateEventLoop函数里,或者说是在redis里,大量使用了 goto 语句,这与很多书中所述的禁止使用goto相悖?
- 由于c语言本身的限制,不得不使用
goto,或者说,不使用goto会造成代码冗余; - 禁止
goto使用在业务中,在c语言实现的底层框架中,还是会存在大量使用goto的情况。
因此,看到goto不要心生鄙夷,亦或胆怯。
下面,是完整的实现。
aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
int i;
// 1 分配内存
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL)
goto err;
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
if (eventLoop->events == NULL || eventLoop->fired == NULL)
goto err;
// 2 初始化
eventLoop->setsize = setsize;
eventLoop->lastTime = time(NULL);
eventLoop->timeEventHead = NULL;
eventLoop->timeEventNextId = 0;
eventLoop->stop = 0;
eventLoop->maxfd = -1;
eventLoop->beforesleep = NULL;
eventLoop->aftersleep = NULL;
eventLoop->flags = 0;
// 3 创建 epollfd
if (aeApiCreate(eventLoop) == -1) goto err;
// 初始化时,什么事件也没关注
for (i = 0; i < setsize; i++)
eventLoop->events[i].mask = AE_NONE;
return eventLoop;
err:
if (eventLoop) {
zfree(eventLoop->events);
zfree(eventLoop->fired);
zfree(eventLoop);
}
return NULL;
} 一个eventloop创建完成之后,下面就是要向这个eventloop中注册事件,然后等待事件的发生,最后去处理事件。下面就来介绍redis中是如何封装epoll_xxx系列函数,完成注册事件、等待事件发生。
epollfd
epoll_xxx系列函数,都是围绕epollfd进行的。在redis中,epoll_xxx系列函数被封装后以 aeApi_ 为前缀。
aeApiCreate:基于epoll_create函数封装,创建epollfdaeApiAddEvent:基于epoll_ctl函数封装,向epllfd中注册感兴趣的事件aeApiDelEvent:基于epoll_ctl函数封装,从epollfd中删除感兴趣的事件aeApiPoll:基于epoll_wait函数封装,阻塞等待事件发生
下面,依次介绍 aeApi_xxx 系列函数。
aeApiCreate
aeApiCreate函数,基于epoll_create函数创建epollfd,其中 epoll_create 函数的入口参数size没有任何含义,但必须是个大于0的正数,也可以直接使用 epoll_create1 函数来创建epollfd,不过要传入一个标志位。
struct aeApiState
epollfd的相关状态在redis中使用struct aeApiState结构体保存,而struct aeApiState对象最终存储在eventloop->apidata中,struct aeApiState字段完整如下。
typedef struct aeApiState {
int epfd; // epollfd
struct epoll_event* events;
} aeApiState;
struct epoll_event {
uint32_t events; // epoll_wait中触发的事件
epoll_data_t data; // 存放触发事件对应的fd
} __EPOLL_PACKED; 其中:
epfd:保存的是aeApiCreate函数创建的epollfd;events:是个数组类型,元素类型struct epoll_event对象中保存epoll_wait监听到的活跃事件及其对应的fd。events大小最大是eventLoop中的setsize;
好嘞,介绍完相关的数据结构,下面来看看aeApiCreate函数,它的整个逻辑也很简单:
- 分配相关数据结构的内存;
- 使用
epoll_create函数来创建efd,并使用struct aeApiState对象state来保存efd及其事件信息; - 将
state存储在aeEventLoop::apidata字段处。
完整实现如下。
static int aeApiCreate(aeEventLoop *eventLoop) {
aeApiState* state = zmalloc(sizeof(aeApiState));
if (!state) return -1;
state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
if (!state->events) {
zfree(state);
return -1;
}
state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
if (state->epfd == -1) {
zfree(state->events);
zfree(state);
return -1;
}
eventLoop->apidata = state;
return 0;
} aeApiAddEvent
aeApiAddEvent函数,内部调用 epoll_ctl 函数, 将 fd 注册到epollfd的事件空间中。
这个行为对epollfd上已监听的事件不会产生任何影响,只是为fd多关注一个事件,下面会即将讲解 的listenfd 也是使用这个函数来注册感兴趣事情。
这个函数实现比较简单,如下。
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee = {0}; /*avoid valgrind warning */
/* If the fd was already monitored for some event, we need a MOD
* operation. Otherwise we need an ADD operation. */
int op = eventLoop->events[fd].mask == AE_NONE ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
ee.events = 0;
mask |= eventLoop->events[fd].mask; // 与操作,是为了不影响fd上之前关注的事件
if (mask & AE_READABLE) ee.events |= EPOLLIN; // 关注可读事件
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; // 关注可写事件
ee.data.fd = fd;
// 将 {fd, ee} 注册到 efd 的事件空间上去
if (epoll_ctl(state->epfd, op, fd, &ee) == -1) return -1;
return 0;
} aeApiDelEvent
aeApiDelEvent函数,内部也是基于epoll_ctl函数,使文件描述符fd不再关注delmask事件。
- 当
fd仅不关注delmask事件后,如果fd仍关注其他事件,epoll_ctl函数的第二个参数则使用EPOLL_CTL_MOD; - 否则,就需要将
fd从efd的事件空间中删除,此时第二个参数使用EPOLL_CTL_DEL标志位。
完整实现如下。
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee = {0}; /* avoid valgrind warning */
int mask = eventLoop->events[fd].mask & (~delmask); // 取消对于 delmask 的关注
ee.events = 0;
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.fd = fd;
// 如果取消关注事件 delmask 后还由其他事件,那么就使用 EPOLL_CTL_MOD 标志位
// 否则就将fd从state->efd的事件空间中删除,此时使用 EPOLL_CTL_DEL 标志位
if (mask != AE_NONE) {
epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee);
} else {
epoll_ctl(state->epfd,EPOLL_CTL_DEL,fd,&ee);
}
} aeApiPoll
aeApiPoll函数,内部是基于epoll_wait函数实现,用于阻塞等待epollfd事件空间中已注册的事件发生。
如果epoll_wait函数监听到事件发生,redis就会将这些事件及其对应的fd存储到eventloop.fired字段中,在上面解释过,这个字段是个数组。
完成的实现如下。
static int aeApiPoll(aeEventLoop* eventLoop, struct timeval* tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
retval = epoll_wait(state->epfd, // epollfd
state->events, // 监听的的事件类型及其fd
eventLoop->setsize, // 最大可监听事件大小
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1); // 最长阻塞时间
if (retval > 0) {
int j;
// 将所有发生的事件及其对应的fd 添加到 eventloop.fired数组中
numevents = retval;
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event* e = state->events+j;
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE; // 错误是即可读又是可写
if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE; // 对端关闭
eventLoop->fired[j].fd = e->data.fd; // 记录触发事件的 fd
eventLoop->fired[j].mask = mask; // 记录触发的事件类型
}
}
return numevents;
} 上面介绍了struct aeEventLoop结构体,以及epoll_xxx系列封装后的aeApi_xxx系列函数。下面,就来看看redis是怎么利用这些函数一步步创建监听服务。
_anetTcpServer
_anetTcpServer函数,是redis内部函数,可以用于创建ipv-4、ipv-6监听服务器。因此,经过封装变成两个对外开发函数,分别用来创建ipv4、ipv6服务。
int anetTcpServer(char *err, int port, char *bindaddr, int backlog)
{
return _anetTcpServer(err, port, bindaddr, AF_INET, backlog); // ipv4
}
int anetTcp6Server(char *err, int port, char *bindaddr, int backlog)
{
return _anetTcpServer(err, port, bindaddr, AF_INET6, backlog); // ipv6
}
// in server.c
int listenToPort(int port, int *fds, int *count) {
int j;
if (server.bindaddr_count == 0) server.bindaddr[0] = NULL;
for (j = 0; j < server.bindaddr_count || j == 0; j++) {
if (server.bindaddr[j] == NULL) {
//...
} else if (strchr(server.bindaddr[j],':')) {
/* Bind IPv6 address. */
fds[*count] = anetTcp6Server(server.neterr,port,server.bindaddr[j], server.tcp_backlog);
} else {
/* Bind IPv4 address. */
fds[*count] = anetTcpServer(server.neterr,port,server.bindaddr[j], server.tcp_backlog);
}
//...
}
return C_OK;
} 下面来看看_anetTcpServer函数,是怎么创建监听状态的服务器,步骤如下:
getaddrinfo函数,获取本机上的所有ip地址及对应的TCP服务。此函数参数:SOCK_STREAM:指定了服务类型,是属于tcpAI_PASSIVE:如果传入的地址bindaddr不是NULL,那么这个设置无效。如果bindaddr==NNULL,则getaddrinfo返回的ip地址就是通配地址。
关于
getaddrinfo的使用,可以参考《UNIX网络编程》。 这个函数,为创建的listenfd任选一个由getaddrinfo返回的本地IP地址来绑定。设置了
SO_REUSEADDR参数,更多socket参数选项,可以参考《UNIX网络编程》。调用
anetListen函数,调用了绑定到固定端口、并开启监听
_anetTcpServer 函数,可以为本地的ipv-4、ipv-6各自创建一个监听文件描述符并保存在 server.ipfd中,个数由server.ipfd_count记录,监听的端口port默认是6379。
static int _anetTcpServer(char *err, int port, char *bindaddr, int af, int backlog)
{
int s = -1, rv;
char _port[6]; /* strlen("65535") */
struct addrinfo hints, *servinfo, *p;
snprintf(_port,6,"%d",port);
memset(&hints,0,sizeof(hints));
hints.ai_family = af; /* ipv4 or ipv6*/
hints.ai_socktype = SOCK_STREAM; /* TCP 数据类型 */
hints.ai_flags = AI_PASSIVE; /* 如果 bindarry==NULL,返回的就是通配地址 */
if ((rv = getaddrinfo(bindaddr, _port, &hints, &servinfo)) != 0) {
anetSetError(err, "%s", gai_strerror(rv));
return ANET_ERR;
}
// 绑定成功一个就会跳出for循环
for (p = servinfo; p != NULL; p = p->ai_next) {
if ((s = socket(p->ai_family, p->ai_socktype, p->ai_protocol)) == -1) // 创建TCP类型的 listenfd
continue;
if (af == AF_INET6 && anetV6Only(err,s) == ANET_ERR)
goto error;
if (anetSetReuseAddr(err, s) == ANET_ERR) goto error; // 设置地址复用
if (anetListen(err,s,p->ai_addr,p->ai_addrlen,backlog) == ANET_ERR) // 绑定地址并开启监听
s = ANET_ERR;
goto end;
}
if (p == NULL) {
anetSetError(err, "unable to bind socket, errno: %d", errno);
goto error;
}
error:
if (s != -1) close(s);
s = ANET_ERR;
end:
freeaddrinfo(servinfo);
return s;
} anetListen
anetListen函数,是redis用来建立监听服务的,内部涉及两个步骤:
- 先调用
bind函数,将监听文件描述符s绑定到地址sd上, - 调用
listen函数,开启监听服务
完成的实现如下。
static int anetListen(char *err, int s, struct sockaddr *sa, socklen_t len, int backlog) {
if (bind(s,sa,len) == -1) {
anetSetError(err, "bind: %s", strerror(errno));
close(s);
return ANET_ERR;
}
if (listen(s, backlog) == -1) {
anetSetError(err, "listen: %s", strerror(errno));
close(s);
return ANET_ERR;
}
return ANET_OK;
} 稍微总结下:
_anetTcpServer函数,创建的是一个阻塞的listenfd;- 在
listenToPort函数中将listenfd设置为非阻塞IO,listenToPort函数调用结束,创建非阻塞的监听服务器才算完成。
下一步,就要将listenfd注册到epollfd中,并且关注可读事件,这样服务器就能监听客户端的连接请求了(触发可读事件),这一过程由aeCreateFileEvent函数完成。
aeCreateFileEvent
aeCreateFileEvent函数,将文件描述符fd注册到 epollfd 的事件空间中,注册感兴趣的事件mask,以及感兴趣的事件发生时要执行的回答函数。
比如,在 initServer 函数中 aeCreateFileEvent 函数将 listenfd 挂在 epollfd 上并注册可读事件,目的是监听客户端的连接请求,客户端的连接请求处理函数是 acceptTcpHandler(这个函数在下一期会讲解)。
//in server.c
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR)
{
serverPanic("Unrecoverable error creating server.ipfd file event.");
}
} 由于 server.ipfd 中保存的是根据本地的ipv4、ipv6地址,因此最终会创建的两个 listenfd。而aeCreateFileEvent函数作用是将每个监听listenfd绑定到所属的eventloop->apidata->efd中去。
因此,上面这个for循环运行结束,会有两个监听服务,一个是监听ipv4,一个监听ipv6。
截图!!!
aeCreateFileEvent函数在此处的主要作用:
- 将
listenfd注册到所属的eventLoop的epfd中去,并关注可读事件; - 设置可读事件的回调函数
acceptTcpHandler,这个也是新的客户端连接到来时候的处理函数; - 更新最大注册的
fd,即maxfd。
完整的实现如下。
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData)
{
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}
aeFileEvent* fe = &eventLoop->events[fd];
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
} 至此,建立监听TCP服务器的流程基本完成:aeCreateEventLoop --> anetTcpServer--> aeCreateFileEvent:
- 建立了非阻塞的监听端口
listenfd; - 将该
listenfd注册到epollfd中。
下面,就是要将服务器运行起来,让listenfd接受客户端的连接请求,这一过程由aeMain函数完成。
aeMain
aeMain函数,内部的核心函数是 aeProcessEvents,使得aeProcessEvents函数处理每次迭代中各个请求。
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
aeProcessEvents(eventLoop, AE_ALL_EVENTS|
AE_CALL_BEFORE_SLEEP|
AE_CALL_AFTER_SLEEP);
}
} aeProcessEvents
在走近reactor设计模式,剖析高性能服务器框架核心一节中讲解过它的前半部分,即如何设置epoll_Wait的阻塞时间。下面来全面地看看它的功能。这个函数,整个流程可以分为5个部分:
在
aeApiPoll函数之前就做了一件事:计算epoll_wait需要阻塞的时间- 如果设置了
AE_DONT_WAIT,那么就是不阻塞,epoll_wait的超时时间为0 - 如果没有其他任务,只有定时器任务,那么
epoll_wait阻塞时间即最早超时时间,防止定时器任务等待过久 - 如果也没有定时器任务,那么就永远等待,直到有事件到来。
这一部分,在走近reactor设计模式,剖析高性能服务器框架核心已经详细说过。
- 如果设置了
beforesleep:主要是在epoll_wait阻塞前处理一些任务,防止因阻塞长时间无法执行,或者是一些准备工作;epoll_wait等待事件发生;aftersleep:可以用于完成epoll_wait唤醒之后的一些校验工作;处理事件:根据是否设置了
AE_BARRIER,来决定同一个fd上是先处理可读事件还是先处理可写事件;处理完活跃的事件,最早的定时器可能已经超时了,是时候去执行了。
完成的实现如下。
int aeProcessEvents(aeEventLoop *eventLoop, int flags) {
int processed = 0, numevents;
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp;
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
shortest = aeSearchNearestTimer(eventLoop); // 如果有时间事件存在,则获取最近的定时器超时时间
if (shortest) {
long now_sec, now_ms;
aeGetTime(&now_sec, &now_ms);
tvp = &tv;
/* How many milliseconds we need to wait for the next
* time event to fire? */
// 最近超时的时间转为毫秒单位
long long ms = (shortest->when_sec - now_sec)*1000 + shortest->when_ms - now_ms;
if (ms > 0) {
tvp->tv_sec = ms/1000;
tvp->tv_usec = (ms % 1000)*1000;
} else {
tvp->tv_sec = 0;
tvp->tv_usec = 0;
}
} else {
/* 要么是超时时间为0,即使设置了AE_DONT_WAIT,不要阻塞
* 要么是没有定时时间,则可以阻塞 */
// 不要阻塞,则设置超时时间为0
if (flags & AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else {
/* Otherwise we can block */
// 没有可触发事件,则等待
t***ULL; /* wait forever */
}
}
// 再次确认下
if (eventLoop->flags & AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
}
if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
eventLoop->beforesleep(eventLoop); // 在休眠之前执行
numevents = aeApiPoll(eventLoop, tvp); // epoll_wait
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
eventLoop->aftersleep(eventLoop); // 在休眠之后执行
// 逐个处理触发的事件
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int fired = 0; /* Number of events fired for current fd. */
int invert = fe->mask & AE_BARRIER;
// fe->mask 是之前关注的事件,mask是产生的事件,如果都有 AE_READABLE
// 则触发可读事件
if (!invert && fe->mask & mask & AE_READABLE) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
}
/* Fire the writable event. */
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
/* If we have to invert the call, fire the readable event now
* after the writable one. */
if (invert) {
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
if ((fe->mask & mask & AE_READABLE) &&
(!fired || fe->wfileProc != fe->rfileProc))
{
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
processed++;
}
}
// 可以去处理时间事件了
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */
} 编译运行
最后,可能再讲下怎么编译redis。在学习的时候,为了便于调试,因此会加上调试信息,并且去优化:
$ pwd /home/codespace/redis-6.0.5 $ make FLAGS="-g -O0" -j 4 ///!!! 编译 $ cd src $ ./redis-server /// !!! 运行redis服务器
我们再使用lsof命令来看看redis-server打开的文件描述符,从下面也可以看出:
fd=5是使用epoll_create函数创建的epollfd;- ·
fd=6、fd=7是redis开启的两个listenfd,分别用于监听ipv4、ipv6请求。
codespace@ubuntu:~/redis-6.0.5$ lsof -c redis-server COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME redis-ser 3556 codespace cwd DIR 8,5 12288 434667 /home/codespace/redis-6.0.5/src redis-ser 3556 codespace rtd DIR 8,5 4096 2 / redis-ser 3556 codespace txt REG 8,5 11793608 428898 /home/codespace/redis-6.0.5/src/redis-server redis-ser 3556 codespace mem REG 8,5 16287648 534002 /usr/lib/locale/locale-archive redis-ser 3556 codespace mem REG 8,5 2029224 538922 /usr/lib/x86_64-linux-gnu/libc-2.31.so redis-ser 3556 codespace mem REG 8,5 157224 540595 /usr/lib/x86_64-linux-gnu/libpthread-2.31.so redis-ser 3556 codespace mem REG 8,5 18816 539148 /usr/lib/x86_64-linux-gnu/libdl-2.31.so redis-ser 3556 codespace mem REG 8,5 1369352 539990 /usr/lib/x86_64-linux-gnu/libm-2.31.so redis-ser 3556 codespace mem REG 8,5 191472 538314 /usr/lib/x86_64-linux-gnu/ld-2.31.so redis-ser 3556 codespace 0u CHR 136,1 0t0 4 /dev/pts/1 redis-ser 3556 codespace 1u CHR 136,1 0t0 4 /dev/pts/1 redis-ser 3556 codespace 2u CHR 136,1 0t0 4 /dev/pts/1 redis-ser 3556 codespace 3r FIFO 0,12 0t0 50175 pipe redis-ser 3556 codespace 4w FIFO 0,12 0t0 50175 pipe redis-ser 3556 codespace 5u a_inode 0,13 0 16306 [eventpoll] //!!! epollfd redis-ser 3556 codespace 6u IPv6 50176 0t0 TCP *:6379 (LISTEN) //!!! ipv6 监听,端口6379 redis-ser 3556 codespace 7u IPv4 77825 0t0 TCP *:6379 (LISTEN) //!!! ipv4 监听,端口6379 redis-ser 3556 codespace 99w REG 8,5 0 428519 /home/codespace/.vscode-server/bin/cfa2e218100323074ac1948c885448fdf4de2a7f/vscode-remote-lock.codespace.cfa2e218100323074ac1948c885448fdf4de2a7f
到此,就大致讲解完毕redis建立监听服务的过程了。不出意外,会为这一期准备个直播,敬请期待。
#C/C++##学习路径#
查看5道真题和解析

