Valkey源码剖析(7):文件事件处理流程

在介绍完文件事件和时间事件的表示之后,接下来该说说处理这两种事件的方式了。

前面说过,Valkey服务器在启动时会调用ae.c/aeMain()函数以启动事件主循环,而该循环所做的事情就是不断地调用ae.c/aeProcessEvents()函数以等待并处理就绪的事件(包括时间事件和文件事件):

int aeProcessEvents(aeEventLoop *eventLoop, int flags) {
    // ...
}

在进入休眠并等待文件事件之前,aeProcessEvents()函数首先要做的是执行休眠前置函数:

if (eventLoop->beforesleep != NULL && (flags & AE_CALL_BEFORE_SLEEP)) eventLoop->beforesleep(eventLoop);

接着aeProcessEvents()函数需要根据当前所处的事件模式以及最近将要就绪的时间事件来决定休眠时长:

// 计算最大休眠时长,如果设置了AE_DONT_WAIT标识就设置为0(不等待)
if ((flags & AE_DONT_WAIT) || (eventLoop->flags & AE_DONT_WAIT)) {
    tv.tv_sec = tv.tv_usec = 0;
    tvp = &tv;
} else if (flags & AE_TIME_EVENTS) {
    // 根据最近将要就绪的时间事件来决定休眠时长
    usUntilTimer = usUntilEarliestTimer(eventLoop);
    if (usUntilTimer >= 0) {
        tv.tv_sec = usUntilTimer / 1000000;
        tv.tv_usec = usUntilTimer % 1000000;
        tvp = &tv;
    }
}

在计算出最大休眠时长之后,aeProcessEvents()函数就会调用aeApiPoll()函数,在给定的休眠时长内等待可能出现的文件事件:

numevents = aeApiPoll(eventLoop, tvp);

aeApiPoll()函数会返回文件事件已就绪的套接字数量,并将已就绪文件事件的相关信息填充至eventLoop->fired数组中。

在休眠完成之后和处理已就绪文件事件之前,aeProcessEvents()函数还需要调用休眠后置函数:

if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) eventLoop->aftersleep(eventLoop, numevents);

在此之后,aeProcessEvents()函数就可以开始处理已就绪的文件事件了:

// 遍历并处理所有已就绪事件的文件描述符
for (j = 0; j < numevents; j++) {
    // 从已就绪结构中获取发生事件的文件描述符
    int fd = eventLoop->fired[j].fd;
    // 获取文件描述符对应的文件事件结构
    aeFileEvent *fe = &eventLoop->events[fd];
    // 获取已就绪的事件类型
    int mask = eventLoop->fired[j].mask;
    int fired = 0; /* Number of events fired for current fd. */

    // 在默认情况下先处理读事件,后处理写事件
    // 但如果设置了AE_BARRIER那么反过来,先处理写,后处理读
    int invert = fe->mask & AE_BARRIER;

    if (!invert && fe->mask & mask & AE_READABLE) {
        // 调用读事件处理函数,处理读事件
        fe->rfileProc(eventLoop, fd, fe->clientData, mask);
        fired++;
        fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
    }

    /* Fire the writable event. */
    if (fe->mask & mask & AE_WRITABLE) {
        if (!fired || fe->wfileProc != fe->rfileProc) {
            // 调用写事件处理函数,处理写事件
            fe->wfileProc(eventLoop, fd, fe->clientData, mask);
            fired++;
        }
    }

    /* If we have to invert the call, fire the readable event now
     * after the writable one. */
    if (invert) {
        fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
        if ((fe->mask & mask & AE_READABLE) && (!fired || fe->wfileProc != fe->rfileProc)) {
            // 处理读事件(在处理写事件之后)
            fe->rfileProc(eventLoop, fd, fe->clientData, mask);
            fired++;
        }
    }

    // 记录已处理事件数量
    processed++;
}

简单来说,这段代码所做的就是遍历eventLoop->fired[j],获取每个文件事件已就绪的套接字,然后按需调用相应的读或写回调函数以处理已就绪的事情。

在默认情况下,aeProcessEvents()函数总是先处理读事件,然后再处理写事件。但如果已就绪事件打开了AE_BARRIER掩码,那么就会反其道而行之。

再然后aeProcessEvents()函数会调用ae.c/processTimeEvents()函数以处理时间事件,最后再将本次迭代处理的事件总数processed返回给调用者:

int aeProcessEvents(aeEventLoop *eventLoop, int flags) {
    // ...

    /* Check time events */
    // 处理时间事件并记录已处理事件数量
    if (flags & AE_TIME_EVENTS) processed += processTimeEvents(eventLoop);

    // 返回本次已处理的事件数量
    return processed; /* return the number of processed file/time events */
}

这就是Valkey在一次事件主循环中处理已就绪事件的流程。处理时间事件的processTimeEvents()函数的运行逻辑将在下一篇文章中介绍。

黄健宏
2025.12.28