Valkey源码剖析(8):时间事件处理流程¶
Valkey中的时间事件由ae.c/processTimeEvents()函数负责处理:
static int processTimeEvents(aeEventLoop *eventLoop) {
// ...
}
这个函数首先要做的是获取时间事件链表的首个节点、当前最大的时间事件ID以及当前的单调时间:
// 指向时间事件链表
te = eventLoop->timeEventHead;
// 获取当前最大的时间事件ID
maxId = eventLoop->timeEventNextId - 1;
// 获取当前的单调时间
monotime now = getMonotonicUs();
之后,函数会从时间事件链表的首个节点开始,遍历整个时间链表,并处理链表上的所有时间事件:
while (te) {
// 处理时间事件的代码...
// 继续处理下一事件
te = te->next;
}
在这个while循环中,程序首先要做的是移除上一次处理时间事件之后,被标记为AE_DELETED_EVENT_ID的待删除时间事件:
// 移除被标记为AE_DELETED_EVENT_ID的事件
// (这些事件在上一次processTimeEvents()函数调用时已经执行过)
if (te->id == AE_DELETED_EVENT_ID) {
aeTimeEvent *next = te->next;
/* If a reference exists for this timer event,
* don't free it. This is currently incremented
* for recursive timerProc calls */
// 只移除引用计数为0的事件
if (te->refcount) {
te = next;
continue;
}
// 调整相邻节点的指针,移除当前节点
if (te->prev)
te->prev->next = te->next;
else
eventLoop->timeEventHead = te->next;
if (te->next) te->next->prev = te->prev;
// 调用收尾函数
if (te->finalizerProc) {
te->finalizerProc(eventLoop, te->clientData);
now = getMonotonicUs();
}
// 释放事件结构
zfree(te);
// 继续处理下一事件
te = next;
continue;
}
之后,程序才会来到真正的时间事件处理逻辑:
if (te->when <= now) {
long long retval;
id = te->id;
te->refcount++;
// 执行事件处理函数
retval = te->timeProc(eventLoop, id, te->clientData);
te->refcount--;
processed++;
// 更新当前时间
now = getMonotonicUs();
// 如果事件处理函数返回的值并非AE_NOMORE,
// 那么说明这个事件需要在之后再次触发,更新它的when属性为下次触发时间,
// 反之则将事件标识为AE_DELETED_EVENT_ID,使得该事件在下次事件循环中被删除
if (retval != AE_NOMORE) {
te->when = now + (monotime)retval * 1000;
} else {
te->id = AE_DELETED_EVENT_ID;
}
在这个判断句中,程序通过检查时间事件的触发时间是否大于当前时间来判断事件是否已被触发,如果触发的话就调用te->timeProc()回调函数来处理事件,并在事件处理完毕之后,根据回调函数的返回值来决定是在之后继续等待该事件,还是将该事件从时间事件链表中删除。
最后,在处理完所有时间事件之后,processTimeEvents()函数将返回本次处理的事件数量为结果:
static int processTimeEvents(aeEventLoop *eventLoop) {
// ...
// 返回已处理事件的数量
return processed;
}
黄健宏
2025.12.29