vlambda博客
学习文章列表

专栏:Redis源码剖析-事件驱动+Reactor

导读

读者建议先有一些同步,异步,阻塞,非阻塞,BIO,NIO,AIO概念的知识。

事件概述

Redis采用Reactor模式来处理事件,简单点来说,Redis会将所有待处理的事件放入事件池,然后挨个取出其中的事件,调用该事件的回调函数对其进行处理,这样的模式就称为Reactor模式,其处理事件的态度就是:「不要来找我,安静的取号排队,我会依次叫号的」。

Redis的事件分为文件事件和时间事件两种。

其中,文件事件采用I/O多路复用技术,监听感兴趣的I/O事件,例如读、写事件等,当事件的描述符变为可读或可写时,就将该事件放入待处理事件池,Redis在事件循环的时候会对其进行一一处理。

而时间事件则是维护一个定时器,每当满足预设的时间要求,就将该时间事件标记为待处理,然后在Redis的事件循环中进行处理。Redis对于这两种事件的处理优先级是:文件事件优先于时间事件。

事件数据结构

在上述的概述中,我们可以获得几个关键池:文件事件,时间事件,待处理事件池和事件循环,Redis为这几个关键词都定义了各自的结构体。下面,依次来看一看:

/* 文件事件结构体 */
typedef struct aeFileEvent {
int mask; // 事件标记,读or写
aeFileProc *rfileProc; // 读事件处理函数
aeFileProc *wfileProc; // 写事件处理函数
void *clientData; // 事件中包含的待处理数据
} aeFileEvent;

很明显,文件事件在处理过程中,需要根据mask来判断调用读或者写函数来处理该事件,这个很好理解。

/* 时间事件结构体 */
typedef struct aeTimeEvent {
long long id; // 时间事件标识符,用于唯一标记该时间事件
long when_sec; // 时间事件触发时间 秒
long when_ms; // 时间事件触发时间 微秒
aeTimeProc *timeProc; // 该事件对应的处理函数
aeEventFinalizerProc *finalizerProc; // 时间事件的最后一次处理程序,若已设置,则删除该事件的时候会调用
void *clientData; // 数据
struct aeTimeEvent *next; // 下一个时间事件
} aeTimeEvent;

在上述结构体中,该时间事件的标识符包括,时间事件id和触发时间when,每次时间循环的时候判断是否到了触发时间,如到了,则调用对应的处理函数进行处理,该时间事件中还定义了该时间事件最后一次的处理函数。另外,next指针指向下一个时间事件,有些时间事件比如隔多少秒执行一次的,这时候next指针就派上用场了。

/* 触发时间结构体 */
typedef struct aeFiredEvent {
int fd; // 文件事件描述符
int mask; // 读写标记
} aeFiredEvent;

每次调用I/O复用程序之后,会返回已经准备好的文件事件描述符,这时候就会以该结构体的形式存放下来。Redis在事件循环的时候,会对这些已准备好待处理的事件一一进行处理,也就是上面说的待处理事件池。

/* 事件循环结构体 */
typedef struct aeEventLoop {
int maxfd; // 当前注册的最大描述符
int setsize; // 需要监听的描述符个数
long long timeEventNextId; // 下一个时间事件的id,用于生成时间事件的唯一标识
time_t lastTime; // 上一次事件循环的时间,用于检测系统时间是否变更
aeFileEvent *events; // 注册要使用的文件事件
aeFiredEvent *fired; // 已准备好,待处理的时间
aeTimeEvent *timeEventHead; // 时间事件头,因为事件时间其实是一个链表
int stop; // 停止标识,1表示停止
void *apidata; // 用于处理底层特定的API数据,对于epoll来说,其包括epoll_fd和epoll_event
aeBeforeSleepProc *beforesleep; // 没有待处理事件时调用
} aeEventLoop;


时间循坏结构中,存放着所有文件事件和时间事件,以及已准备好的事件,还有以下标识参数等。这些使得事件循环能够安全,高效的运行。

事件创建与删除

我们知道,Redis的事件分为文件事件和时间事件,上述也给出了其分别对应的结构体。首先,我们去看看如何创建和删除事件。

/* 创建文件事件 */
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData);
/* 删除文件事件 */
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask);
/* 根据文件描述符获取文件事件 */
int aeGetFileEvents(aeEventLoop *eventLoop, int fd);
/* 创建时间事件 */
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
aeTimeProc *proc, void *clientData,
aeEventFinalizerProc *finalizerProc);
/* 删除时间事件 */
int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id);

这些函数都非常简单,就是对其结构体进行一些初始化操作,我们主要来了解一下在什么时候创建事件,继续跟踪代码的话,这些就一目了然。

关于文件事件,有两个地方需要创建它:

  • 初始化服务器的时候,需要监听新的客户连接

/* 截取至server.c/initServer函数 */
/* 服务器可能有多个文件描述符,根据每个文件描述符来创建监听事件,监听新客户的连接 */
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
serverPanic("Unrecoverable error creating server.ipfd file event.");
}
}
if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");
  • 在客户连接服务器之后,需要监听该客户的读写事件

/* 代码截取至netWorking.c/createClient函数 */
/* 新用户连接的时候需要创建client结构体,此时就需要创建文件事件,用来监听其读写操作 */
if (aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c) == AE_ERR)
{
close(fd);
zfree(c);
return NULL;
}

关于时间事件,Redis在初始化服务器的时候就会创建,前面我们提到的AOF和RDB持久化以及过期键的处理等操作中,都设计到定时操作,时间事件就是为了这些定时操作而设定的,在特定的时间触发时间事件,并进行相应的处理。

/* 截取至server.c/initServer函数 */
/* 创建时间事件,用于处理定时任务 */
if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
serverPanic("Can't create the serverCron time event.");
exit(1);
}

I/O多路复用

在进行文件事件处理前,需要调用I/O多路复用来获得已经准备就绪的文件描述符,关于I/O多路复用,有以下四种函数可以使用。

  • select

  • evport(libevent)

  • epoll

  • kqueue

其中,不同的系统支持不同的多路复用函数,linux下可以使用select和epoll,macos下可以使用select和kqueue,如果安装有libevent,则可以使用evport。对于I/O多路复用,不懂的可以参考我的这篇学习笔记:UNIX网络编程之五:select和poll函数


Redis为了适用于不同的操作系统,同时也要达到最高的效率,定义了如下库选择宏,用于根据当前操作系统的环境来选择最高效的I/O多路复用函数。

#ifdef HAVE_EVPORT
#include "ae_evport.c"#else
#ifdef HAVE_EPOLL
#include "ae_epoll.c" #else
#ifdef HAVE_KQUEUE
#include "ae_kqueue.c" #else
#include "ae_select.c" #endif
#endif
#endif

另外,为了统一的使用这些I/O复用函数,Redis对其进行了特定的封装,使得对外的接口得到了统一,操作起来也很方便。

/* 添加需要监听的事件 */
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask)
/* 初始化I/O多路复用库所需的参数 */
static int aeApiCreate(aeEventLoop *eventLoop)
/* 删除不需要监听的事件 */
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask)
/* 清空 */
static void aeApiFree(aeEventLoop *eventLoop)
/* 返回当前使用的库的名字 */
static int aeApiName(void)
/* 取出已准备好的文件描述符 */
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp)

我们以select为例,其可以监听读或写类型的文件描述符,所以,需要定义一个结构体来存放需要监听的读写类型的文件描述符集合。

typedef struct aeApiState {
fd_set rfds, wfds; // 读写文件描述符集合
/* We need to have a copy of the fd sets as it's not safe to reuse
* FD sets after select(). */

fd_set _rfds, _wfds;
} aeApiState;

在eventloop结构体中apidata字段保存的数据类型就是aeApiState增加,删除,清空等操作都是对这两个文件描述符集进行操作,没什么好讲的,主要是aeApiPoll函数,其调用底层的I/O复用函数,获取已经准备好的描述符集。

/* 调用select函数,获取已经准备好的描述符集 */
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, j, numevents = 0;

memcpy(&state->_rfds,&state->rfds,sizeof(fd_set));
memcpy(&state->_wfds,&state->wfds,sizeof(fd_set));
// 调用select函数
retval = select(eventLoop->maxfd+1,
&state->_rfds,&state->_wfds,NULL,tvp);
if (retval > 0) {
for (j = 0; j <= eventLoop->maxfd; j++) {
int mask = 0;
// 找到该描述符对应的事件
aeFileEvent *fe = &eventLoop->events[j];
// 根据mask判断读或写事件,并在需要监听的描述符中确定存在该描述符
if (fe->mask == AE_NONE) continue;
if (fe->mask & AE_READABLE && FD_ISSET(j,&state->_rfds))
mask |= AE_READABLE;
if (fe->mask & AE_WRITABLE && FD_ISSET(j,&state->_wfds))
mask |= AE_WRITABLE;
// 加入到已经准备好的事件中,等待处理
eventLoop->fired[numevents].fd = j;
eventLoop->fired[numevents].mask = mask;
numevents++;
}
}
// 返回已经准备好的事件个数
return numevents;
}

简单点来说,I/O多路复用函数的作用就是,将给定的需要监听的描述符集作为输入,该函数轮询这些描述符集,然后找出其中已经准备好的文件描述符集。

事件循环

上面调用I/路复用程序O多已经获取了准备好的事件,接下来就需要对这些事件进行处理。在server.c文件里,Redis的入口函数main函数中,就调用了aeMain()函数,用于执行事件循环。

/* 事件循环主函数 */
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0; // 开启事件循环
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS); // 事件处理函数
}
}

我们可以看到,基本上事件循环自服务器运行开始,就一直不停的执行,不停地处理各类事件。关于事件处理的具体函数如下:

/* 事件处理函数 */
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;

// 没有需要处理的事件就直接返回
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

// 即使没有要处理的文件事件,只要我们想处理时间事件就需要调用select()函数
// 这是为了睡眠直到下一个时间事件准备好。
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp;
// 获取最近的时间事件
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
shortest = aeSearchNearestTimer(eventLoop);
if (shortest) {
// 运行到这里说明时间事件存在,则根据最近可执行时间事件和现在的时间的时间差
// 来决定文件事件的阻塞事件
long now_sec, now_ms;

aeGetTime(&now_sec, &now_ms);
tvp = &tv;

// 计算下一次时间事件准备好的时间
long long ms =
(shortest->when_sec - now_sec)*1000 +
shortest->when_ms - now_ms;
if (ms > 0) {
tvp->tv_sec = ms/1000;
tvp->tv_usec = (ms % 1000)*1000;
} else {
// 时间差小于0,代表可以处理了
tvp->tv_sec = 0;
tvp->tv_usec = 0;
}
} else {
// 执行到这里,说明没有待处理的时间事件
// 此时根据AE_DONT_WAIT参数来决定是否设置阻塞和阻塞的时间
if (flags & AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else {
/* Otherwise we can block */
tvp = NULL; /* wait forever */
}
}
// 调用I/O复用函数获取已准备好的事件
numevents = aeApiPoll(eventLoop, tvp);
for (j = 0; j < numevents; j++) {
// 从已就绪事件中获取事件
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = 0;

// 如果为读事件则调用读事件处理函数
if (fe->mask & mask & AE_READABLE) {
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
// 如果为写事件则调用写事件处理函数
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
processed++;
}
}
// 处理时间事件,记住,此处说明Redis的文件事件优先于时间事件
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
// 返回处理的事件个数
return processed;
}

简而言之,事件的处理步骤就是:

  • 查找最早的时间事件,判断是否需要执行,如需要,就标记下来,等待处理

  • 获取已准备好的文件事件描述符集

  • 优先处理读事件

  • 处理写事件

  • 如有时间事件,就处理时间事件

事件处理函数

前面几小节中,分析了创建事件,事件循环,那么对于待处理的事件,Redis对这些事件都进行哪些处理呢?

时间事件处理

在创建时间事件的函数中,传入了时间事件的真正处理函数serverCron该函数在之前的源码分析中已多次提到,由于源码过长,这里只简要总结一下该函数中执行哪些操作。

  • 更新服务器的各类统计信息,比如时间、内存占用、数据库占用等

  • 清理数据库中的过期键值对

  • 关闭和清理连接失效的客户端

  • 尝试进行AOF和RDB持久化操作

  • 如果是主服务器,就对从服务器进行定期同步

  • 如果是集群模式,对集群进行定期同步和连接测试

文件事件处理

在注册文件事件的时候,给定的文件处理函数是acceptTcpHandlerreadQueryFromClient我们先来看看前者。

很显然,前者对应的事件是为了监听新客户的连接,其事件处理应该是:处理客户的连接,并创建新客户结构体,初始化等

/* 监听新客户连接事件的处理函数*/
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
char cip[NET_IP_STR_LEN];
UNUSED(el);
UNUSED(mask);
UNUSED(privdata);
// TCP连接,接受客户的连接
while(max--) {
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
// 连接出错
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK)
serverLog(LL_WARNING,
"Accepting client connection: %s", server.neterr);
return;
}
// 记录日志
serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
// 初始化客户操作
acceptCommonHandler(cfd,0,cip);
}
}

上述函数只是接受了该客户的连接,真正创建客户结构体的操作由acceptCommonHandler函数完成。该函数的源码就不列出来了,这里主要说一下它的主要工作:

  • 创建redisClient结构体,并建立和保存服务器和客户端的连接信息

  •     为该客户端的套接字注册读事件,相应的回调函数

    readQueryFromClient这个在下面介绍)

后者则是创建了客户端结构体之后,需要监听客户端的读操作的事件处理函数,用于读取客户端穿来的命令,并执行相关操作,再回复给客户端。

所以,针对文件事件的处理就是:监听新客户的连接,如连接就创建新客户,然后继续监听该客户的读事件,如客户发送命令,就解析命令,执行相应的处理,然后回复客户端。

小结

本篇博客分析了事件的创建,循环和处理过程,到此,结合以前的分析,我们基本上可以理解客户端传来命令之后,服务器进行相应处理,然后回复客户端,这整个大循环过程了。对于时间事件和文件事件,想用下面的比特图来总结一下。

time ------------------------------------------------------------------------------->
|<------10ms------->|<------10ms------->|<------10ms------->|<------10ms------->|
| fileEvent1 | fileEvent2 | block | fileEvent3 | fileEvent4 | block |
| | timeEvent1| | timeEvent2|

这张图告诉我们,在时间事件执行的时候,文件事件是处于阻塞状态的,所以,再一次印证了Redis是单线程的,整个事件循环是串行的。

还可以看