Valkey源码剖析(21):命令请求的读取、解析与执行¶
networking.c/createClient()函数在为客户端创建client结构时,会为客户端在事件子系统中绑定networking.c/readQueryFromClient()函数作为读事件的处理器:
client *createClient(connection *conn) {
client *c = zmalloc(sizeof(client));
// 连接可以为NULL,用于在Lua脚本等环境中执行命令(因为Redis执行命令必须在客户端上下文中)
if (conn) {
// 绑定查询读取处理器
connSetReadHandler(conn, readQueryFromClient);
// 设置私有数据
connSetPrivateData(conn, c);
// 设置状态
conn->flags |= CONN_FLAG_ALLOW_ACCEPT_OFFLOAD;
}
// ...
return c;
}
当客户端向服务器发送命令请求的时候,客户端对应读事件就会就绪,而readQueryFromClient()处理器就会被调用:
// 每次文件事件循环可以尝试读取的最大次数
#define REPL_MAX_READS_PER_IO_EVENT 25
void readQueryFromClient(connection *conn) {
// 从连接中获取客户端
client *c = connGetPrivateData(conn);
/* Check if we can send the client to be handled by the IO-thread */
// 判断本次读取能否由IO线程来处理
if (postponeClientRead(c)) return;
// 如果状态不为IDLE,那么说明已经有读或写操作在进行中,直接返回
if (c->io_write_state != CLIENT_IDLE || c->io_read_state != CLIENT_IDLE) return;
bool repeat = false;
int iter = 0;
do {
// 读取套接字
bool full_read = readToQueryBuf(c);
// 检查读取情况并处理可能出现的错误
if (handleReadResult(c) == C_OK) {
// 读取正常,那么……
// 对客户端查询缓冲区进行解析,并执行解析所得的命令
if (processInputBuffer(c) == C_ERR) return;
// 释放已解析命令队列中未使用的内存(仅在命令已经执行完毕的情况下进行)
trimCommandQueue(c);
}
// 在读取次数不超过限制次数的情况下,尽可能地多读取客户端传来的数据
repeat = (c->flag.primary &&
!c->flag.close_asap &&
++iter < REPL_MAX_READS_PER_IO_EVENT &&
full_read);
// 执行那些需要在转而处理下个客户端之前,需要对当前客户端执行的操作
beforeNextClient(c);
} while (repeat);
}
readToQueryBuf()函数的主要工作如下:
调用
networking.c/readToQueryBuf()函数,从客户端套接字中读取数据,并将其存储至客户端的查询缓冲区client->querybuf中。调用
networking.c/processInputBuffer()函数,解析客户端查询缓冲区中的命令请求,然后执行这些命令。
processInputBuffer()函数的主要定义如下:
int processInputBuffer(client *c) {
/* Parse the query buffer and/or execute already parsed commands. */
// 解析查询缓冲区并/或执行已解析命令
while ((c->querybuf && c->qb_pos < sdslen(c->querybuf)) ||
c->cmd_queue.off < c->cmd_queue.len) {
// ...
// 尝试从命令队列中弹出首个命令作为当前命令
if (!consumeCommandQueue(c)) {
// 如果队列中没有可供弹出的命令,那么解析输入缓冲区以获取可执行命令
parseInputBuffer(c);
// 预处理当前命令
prepareCommandQueue(c);
}
// ...
/* We are finally ready to execute the command. */
// 正式开始处理和执行命令
if (processCommandAndResetClient(c) == C_ERR) {
/* If the client is no longer valid, we avoid exiting this
* loop and trimming the client buffer later. So we return
* ASAP in that case. */
// 当客户端不再有效时,立即退出此循环,并在稍后修剪客户端缓冲区
return C_ERR;
}
}
return C_OK;
}
在processInputBuffer()函数的定义中,它首先会调用networking.c/parseInputBuffer()函数以尝试解析查询缓冲区中的数据,并将待执行的命令请求放到客户端的命令队列中。
接着,processInputBuffer()函数会调用server.c/prepareCommandQueue()函数,对客户端命令队列中待执行的命令进行预处理,确保它们可以进入被执行的状态。
最后,processInputBuffer()函数会调用networking.c/processCommandAndResetClient()函数,正式开始处理和执行命令队列中的命令。
processCommandAndResetClient()函数要做的事情不多,它最重要的工作就是调用server.c/processCommand()函数以执行客户端的当前命令,这之后的工作就跟我们在之前的系列文章中看到过的Valkey命令的执行过程一样了:
int processCommandAndResetClient(client *c) {
int deadclient = 0;
client *old_client = server.current_client;
server.current_client = c;
// 执行命令
if (processCommand(c) == C_OK) {
// 执行一些命令执行之后的后续操作,比如:清空客户端、更新复制偏移量、传播命令等
commandProcessed(c);
/* Update the client's memory to include output buffer growth following the
* processed command. */
// 更新客户端内存信息
if (c->conn) updateClientMemUsageAndBucket(c);
}
// ...
}
以下是server.h/client结构中与查询缓冲区及命令执行有关的主要属性:
typedef struct client {
// ...
// 查询缓冲区
sds querybuf; /* Buffer we use to accumulate client queries. */
// 已读取查询缓冲区的偏移量
size_t qb_pos; /* The position we have read in querybuf. */
// (当前正在执行命令的)命令参数
robj **argv; /* Arguments of current command. */
// (当前正在执行命令的)命令参数数量
int argc; /* Num of arguments of current command. */
// 命令参数数组的长度(可能大于实际的参数数量)
int argv_len; /* Size of argv array (may be more than argc) */
// 已解析命令队列
cmdQueue cmd_queue; /* Parsed commands queue */
// 正在执行的命令
struct serverCommand *cmd; /* Current command. */
// ...
} client;