Redis源码解析3 – 事件循环

Redis实现了异步事件循环,主要代码在ae.h,ae.c,ae_*.c几个文件中,支持epoll,evport,kqueue,select四种异步模型。

头文件 ae.h

宏定义

/* ae.h */
#define AE_OK 0
#define AE_ERR -1

#define AE_NONE 0       // 0b00000000 没有事件
#define AE_READABLE 1   // 0b00000001 可读触发
#define AE_WRITABLE 2   // 0b00000010 可写触发
#define AE_BARRIER 4    // 0b00000100 设置时事件循环先处理write再处理read

#define AE_FILE_EVENTS 1       // 0b00000001 文件事件
#define AE_TIME_EVENTS 2       // 0b00000010 定时事件
#define AE_ALL_EVENTS (AE_FILE_EVENTS|AE_TIME_EVENTS)  // 0b00000011 所有事件
#define AE_DONT_WAIT 4         // 0b00000100 aeApiPoll不等待
#define AE_CALL_AFTER_SLEEP 8   // 0b00001000 调用aftersleep回调

#define AE_NOMORE -1           // 用于停止定时事件
#define AE_DELETED_EVENT_ID -1 // 用于惰性删除

#define AE_NOTUSED(V) ((void) V)

类型定义

/* ae.h */
// 文件事件类型
typedef struct aeFileEvent {
    int mask;             // AE_(READABLE|WRITABLE|BARRIER)
    aeFileProc *rfileProc; // 读回调
    aeFileProc *wfileProc; // 写回调
    void *clientData;
} aeFileEvent;

// 定时事件类型,用双向链表实现
typedef struct aeTimeEvent {
    long long id;
    long when_sec; // 触发时间,秒
    long when_ms; // 触发时间,毫秒
    aeTimeProc *timeProc; // 回调函数
    aeEventFinalizerProc *finalizerProc; // 终止函数
    void *clientData;
    struct aeTimeEvent *prev; // 链表中下一个节点
    struct aeTimeEvent *next; // 链表中前一个节点
} aeTimeEvent;

// 触发的文件事件
typedef struct aeFiredEvent {
    int fd;
    int mask;
} aeFiredEvent;

// 事件循环
typedef struct aeEventLoop {
    int maxfd;   // 当前最大的fd
    int setsize; // 允许同时监听的文件数量
    long long timeEventNextId; // 每次新建定时事件后+1,该变量单调递增,删除定时事件不会使ID减小
    time_t lastTime;     // 上一次处理定时事件的时间,为了处理操作系统时钟回拨
    aeFileEvent *events; // 事件数组
    aeFiredEvent *fired; // 触发的文件事件
    aeTimeEvent *timeEventHead; // 时间事件链表头
    int stop;   // 停止flag,置为true时aeMain退出循环
    void *apidata; // aeApi*函数接口使用的数据
    aeBeforeSleepProc *beforesleep;  // beforesleep回调
    aeBeforeSleepProc *aftersleep; // aftersleep回调
} aeEventLoop;

定时器的实现有链表、时间轮、时间堆、红黑树等多种方式,其中时间轮和时间堆性能较好,不过由于Redis中只有少量的定时任务,所以aeTimeEvent用一个双向链表就足够了。

函数接口定义

/* ae.h */
aeEventLoop *aeCreateEventLoop(int setsize); // 新建event loop,分配内存
void aeDeleteEventLoop(aeEventLoop *eventLoop); // 删除event loop,释放内存
void aeStop(aeEventLoop *eventLoop); // 停止event loop
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData);  // 注册文件事件
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask); // 删
int aeGetFileEvents(aeEventLoop *eventLoop, int fd); // 获取文件事件
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
        aeTimeProc *proc, void *clientData,
        aeEventFinalizerProc *finalizerProc); // 注册定时事件
int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id);  // 删
int aeProcessEvents(aeEventLoop *eventLoop, int flags);  // 处理事件
int aeWait(int fd, int mask, long long milliseconds);  // 等待
void aeMain(aeEventLoop *eventLoop);  // 主循环
char *aeGetApiName(void);  // "select" | "evport" | "kqueue" | "epoll"
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep); // 注册回调
void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep); // 注册回调
int aeGetSetSize(aeEventLoop *eventLoop);  // 获取监听文件的数量
int aeResizeSetSize(aeEventLoop *eventLoop, int setsize);  // 修改监听文件的数量

具体实现 ae.c

文件开头直接导入了.c文件,其中每个.c文件都封装了一种异步I/O模型。

/* ae.c */
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
    #ifdef HAVE_EPOLL
    #include "ae_epoll.c"
    #else
        #ifdef HAVE_KQUEUE
        #include "ae_kqueue.c"
        #else
        #include "ae_select.c"
        #endif
    #endif
#endif

ae_*.c文件都实现了一组相同的API,包括aeApiState结构体类型和一组aeApi开头的函数接口,这一层封装做得很好,将操作系统提供的各种异步I/O模型封装成了一组统一的函数接口,可以学习一下这组函数接口。

/* ae_*.c */
static int aeApiCreate(aeEventLoop* eventloop);
static int aeApiResize(aeEventLoop* eventloop, int setsize);
static int aeApiFree(aeEventLoop* eventloop);
static int aeApiAddEvent(aeEventLoop* eventloop, int fd, int mask);
static int aeApiDelEvent(aeEventLoop* eventloop, int fd, int mask);
static int aeApiPoll(aeEventLoop* eventloop, struct timeval *tvp);
static char* aeApiName(void);
举个例子,ae_epoll.c封装了epoll异步I/O模型:

/* ae_epoll.c */
typedef struct aeApiState {
    int epfd; // epoll文件描述符
    struct epoll_event *events;  // 事件数组
} aeApiState;

// 新建异步I/O模型
static int aeApiCreate(aeEventLoop *eventLoop) {
    // 状态变量分配内存,注意这里分配的内存都是未初始化的
    aeApiState *state = zmalloc(sizeof(aeApiState));
    // 分配内存失败返回-1
    if (!state) return -1;
    // 事件数组分配内存
    state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
    if (!state->events) {
        // 分配内存失败,释放内存
        zfree(state);
        return -1;
    }
    // 创建epoll文件,1024只起到提示作用
    state->epfd = epoll_create(1024);
    if (state->epfd == -1) {
        // 创建失败,释放内存
        zfree(state->events);
        zfree(state);
        return -1;
    }
    // 状态结构体保存在void*类型的apidata字段中
    eventLoop->apidata = state;
    // 创建成功返回0
    return 0;
}

static int aeApiResize(aeEventLoop *eventLoop, int setsize) {
    aeApiState *state = eventLoop->apidata;
    // 重新分配监听事件数组
    state->events = zrealloc(state->events, sizeof(struct epoll_event)*setsize);
    return 0;
}

// 释放状态变量内存
static void aeApiFree(aeEventLoop *eventLoop) {
    aeApiState *state = eventLoop->apidata;
    close(state->epfd);
    zfree(state->events);
    zfree(state);
}

// 添加事件
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee = {0}; /* 为了消除valgrind警告*/
    // 如果事件没有注册,需要EPOLL_CTL_ADD,否则需要EPOLL_CTL_MOD
    int op = eventLoop->events[fd].mask == AE_NONE ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
    ee.events = 0;
    // 注意这里要和之前的mask进行或运算
    mask |= eventLoop->events[fd].mask;
    // 将mask转为EPOLLIN和EPOLLOUT
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.fd = fd;
    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
    return 0;
}

// 删除事件
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee = {0}; /* 为了消除valgrind警告*/
    // 新的mask
    int mask = eventLoop->events[fd].mask & (~delmask);
    ee.events = 0;
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.fd = fd;
    if (mask != AE_NONE) {
        epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee);
    } else {
        /* Note, Kernel < 2.6.9 requires a non null event pointer even for
         * EPOLL_CTL_DEL. */
        epoll_ctl(state->epfd,EPOLL_CTL_DEL,fd,&ee);
    }
}

// 轮询
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;
    // 等待触发,设置毫秒级的超时时间,-1表示没有阻塞超时时间
    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
    // 将触发的事件写到fired数组
    if (retval > 0) {
        int j;
        numevents = retval;
        for (j = 0; j < numevents; j++) {
            int mask = 0;
            struct epoll_event *e = state->events+j;
            if (e->events & EPOLLIN) mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) mask |= AE_WRITABLE;
            if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
            eventLoop->fired[j].fd = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }
    }
    return numevents;
}

// 接口名称
static char *aeApiName(void) {
    return "epoll";
}
接下来我们回到ae.c看事件循环是如何实现的:

/* ae.c */
// 创建事件循环,setsize表示同时监听的文件最大数量
aeEventLoop *aeCreateEventLoop(int setsize) {
    aeEventLoop *eventLoop;
    int i;
    if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
    eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
    eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
    if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
    eventLoop->setsize = setsize;
    eventLoop->lastTime = time(NULL);
    eventLoop->timeEventHead = NULL;
    eventLoop->timeEventNextId = 0;
    eventLoop->stop = 0;
    eventLoop->maxfd = -1;
    eventLoop->beforesleep = NULL;
    eventLoop->aftersleep = NULL;
    if (aeApiCreate(eventLoop) == -1) goto err;
    // aeApiCreate函数分配的events内存没有清零,在这里初始化清零
    for (i = 0; i < setsize; i++)
        eventLoop->events[i].mask = AE_NONE;
    return eventLoop;
err:
    // 失败时释放内存,返回NULL指针
    if (eventLoop) {
        zfree(eventLoop->events);
        zfree(eventLoop->fired);
        zfree(eventLoop);
    }
    return NULL;
}

// 调整SetSize大小
int aeResizeSetSize(aeEventLoop *eventLoop, int setsize) {
    int i;
    // 大小不变直接返回
    if (setsize == eventLoop->setsize) return AE_OK;
    // setsize要大于当前最大的fd
    if (eventLoop->maxfd >= setsize) return AE_ERR;
    // 调整失败
    if (aeApiResize(eventLoop,setsize) == -1) return AE_ERR;
    // 重新分配内存
    eventLoop->events = zrealloc(eventLoop->events,sizeof(aeFileEvent)*setsize);
    eventLoop->fired = zrealloc(eventLoop->fired,sizeof(aeFiredEvent)*setsize);
    eventLoop->setsize = setsize;
    // 初始化events数组的后半部分
    for (i = eventLoop->maxfd+1; i < setsize; i++)
        eventLoop->events[i].mask = AE_NONE;
    return AE_OK;
}

// 删除释放事件循环
void aeDeleteEventLoop(aeEventLoop *eventLoop) {
    aeApiFree(eventLoop);
    zfree(eventLoop->events);
    zfree(eventLoop->fired);
    zfree(eventLoop);
}

// 停止
void aeStop(aeEventLoop *eventLoop) {
    eventLoop->stop = 1;
}

// 添加文件事件
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData)
{
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }
    aeFileEvent *fe = &eventLoop->events[fd];
    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
    fe->mask |= mask;
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    fe->clientData = clientData;
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}

// 删除文件事件
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)
{
    // 无效的fd直接返回
    if (fd >= eventLoop->setsize) return;
    aeFileEvent *fe = &eventLoop->events[fd];
    // 无操作直接返回
    if (fe->mask == AE_NONE) return;
    // 删除AE_WRITABLE的同时删除AE_BARRIER
    if (mask & AE_WRITABLE) mask |= AE_BARRIER;
    aeApiDelEvent(eventLoop, fd, mask);
    fe->mask = fe->mask & (~mask);
    // 如果删除的是最大的fd,则要更新maxfd
    if (fd == eventLoop->maxfd && fe->mask == AE_NONE) {
        int j;
        for (j = eventLoop->maxfd-1; j >= 0; j--)
            if (eventLoop->events[j].mask != AE_NONE) break;
        eventLoop->maxfd = j;
    }
}

// 添加定时事件
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds, aeTimeProc *proc, void *clientData, aeEventFinalizerProc *finalizerProc)
{
    long long id = eventLoop->timeEventNextId++;
    aeTimeEvent *te;
    te = zmalloc(sizeof(*te));
    if (te == NULL) return AE_ERR;
    te->id = id;
    aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
    te->timeProc = proc;
    te->finalizerProc = finalizerProc;
    te->clientData = clientData;
    te->prev = NULL;
    // 将新的定时事件放在链表头部
    te->next = eventLoop->timeEventHead;
    if (te->next)
        te->next->prev = te;
    eventLoop->timeEventHead = te;
    // 返回任务ID
    return id;
}

// 删除定时任务
int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id)
{
    // 遍历链表
    aeTimeEvent *te = eventLoop->timeEventHead;
    while(te) {
        if (te->id == id) {
            // 惰性删除
            te->id = AE_DELETED_EVENT_ID;
            return AE_OK;
        }
        te = te->next;
    }
    // 未找到对应ID
    return AE_ERR; /* NO event with the specified ID found */
}

// 查找最近将超时的的计时器
static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop)
{
    // 因为Redis中定时任务数量不多,这里简单地用链表实现了定时器
    aeTimeEvent *te = eventLoop->timeEventHead;
    aeTimeEvent *nearest = NULL;
    while(te) {
        if (!nearest || te->when_sec < nearest->when_sec ||
                (te->when_sec == nearest->when_sec &&
                 te->when_ms < nearest->when_ms))
            nearest = te;
        te = te->next;
    }
    return nearest;
}

// 处理定时事件
static int processTimeEvents(aeEventLoop *eventLoop) {
    int processed = 0;
    aeTimeEvent *te;
    long long maxId;
    time_t now = time(NULL);
    // 检测操作系统时钟回退,将定时事件超时事件置为0
    if (now < eventLoop->lastTime) {
        te = eventLoop->timeEventHead;
        while(te) {
            te->when_sec = 0;
            te = te->next;
        }
    }
    // 更新记录新的事件
    eventLoop->lastTime = now;    
    te = eventLoop->timeEventHead;
    maxId = eventLoop->timeEventNextId-1;
    // 遍历链表
    while(te) {
        long now_sec, now_ms;
        long long id;
        // 释放已经删除的定时事件
        if (te->id == AE_DELETED_EVENT_ID) {
            // 从链表中删除节点
            aeTimeEvent *next = te->next;
            if (te->prev)
                te->prev->next = te->next;
            else
                eventLoop->timeEventHead = te->next;
            if (te->next)
                te->next->prev = te->prev;
            if (te->finalizerProc)
                // 调用终止函数
                te->finalizerProc(eventLoop, te->clientData);
            zfree(te);
            te = next;
            continue;
        }
        if (te->id > maxId) {
            te = te->next;
            continue;
        }
        aeGetTime(&now_sec, &now_ms);
        // 触发定时任务
        if (now_sec > te->when_sec ||
            (now_sec == te->when_sec && now_ms >= te->when_ms))
        {
            int retval;
            id = te->id;
            // 函数的返回值是下次触发的间隔毫秒数
            retval = te->timeProc(eventLoop, id, te->clientData);
            processed++;
            if (retval != AE_NOMORE) {
                // 更新定时事件的触发时间,以便下次触发
                aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
            } else {
                // 停止定时事件
                te->id = AE_DELETED_EVENT_ID;
            }
        }
        te = te->next;
    }
    return processed;
}

// 处理所有监听事件
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;
    // 无操作
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
    // 先处理文件事件
    if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;
        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            // 查找最近将要触发的定时事件
            shortest = aeSearchNearestTimer(eventLoop);
        if (shortest) {
            long now_sec, now_ms;
            // 获取当前时间
            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;
            // 等待的毫秒数
            long long ms = (shortest->when_sec - now_sec)*1000 + shortest->when_ms - now_ms;
            if (ms > 0) {
                tvp->tv_sec = ms/1000;
                tvp->tv_usec = (ms % 1000)*1000;
            } else {
                // 已经超时,不等待
                tvp->tv_sec = 0;
                tvp->tv_usec = 0;
            }
        } else {
            // 没有定时任务
            if (flags & AE_DONT_WAIT) {
                // 不等待
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
                // 不设置超时
                tvp = NULL;
            }
        }
        // 等待事件触发,tvp为等待超时
        numevents = aeApiPoll(eventLoop, tvp);
        // aftersleep回调
        if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
            eventLoop->aftersleep(eventLoop);
        // 处理触发的事件
        for (j = 0; j < numevents; j++) {
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int fired = 0;
            // 通常先处理读,再处理写
            // 当设置AE_BARRIER时,先处理写再处理读
            int invert = fe->mask & AE_BARRIER;
            // 非invert时先处理读
            if (!invert && fe->mask & mask & AE_READABLE) {
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                fired++;
            }
            // 处理写
            if (fe->mask & mask & AE_WRITABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }
            // invert时处理读
            if (invert && fe->mask & mask & AE_READABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }
            processed++;
        }
    }
    // 处理定时事件
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);
    // 返回处理的事件数量
    return processed;
}

// 等待fd上的事件,用于同步I/O操作
int aeWait(int fd, int mask, long long milliseconds) {
    struct pollfd pfd;
    int retmask = 0, retval;
    memset(&pfd, 0, sizeof(pfd));
    pfd.fd = fd;
    if (mask & AE_READABLE) pfd.events |= POLLIN;
    if (mask & AE_WRITABLE) pfd.events |= POLLOUT;
    if ((retval = poll(&pfd, 1, milliseconds))== 1) {
        if (pfd.revents & POLLIN) retmask |= AE_READABLE;
        if (pfd.revents & POLLOUT) retmask |= AE_WRITABLE;
        if (pfd.revents & POLLERR) retmask |= AE_WRITABLE;
        if (pfd.revents & POLLHUP) retmask |= AE_WRITABLE;
        return retmask;
    } else {
        return retval;
    }
}

// 主循环
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
    }
}

发表评论

电子邮件地址不会被公开。 必填项已用*标注