Valkey源码剖析(21):命令请求的读取、解析与执行

networking.c/createClient()函数在为客户端创建client结构时,会为客户端在事件子系统中绑定networking.c/readQueryFromClient()函数作为读事件的处理器:

client *createClient(connection *conn) {
    client *c = zmalloc(sizeof(client));

    // 连接可以为NULL,用于在Lua脚本等环境中执行命令(因为Redis执行命令必须在客户端上下文中)
    if (conn) {
        // 绑定查询读取处理器
        connSetReadHandler(conn, readQueryFromClient);
        // 设置私有数据
        connSetPrivateData(conn, c);
        // 设置状态
        conn->flags |= CONN_FLAG_ALLOW_ACCEPT_OFFLOAD;
    }

    // ...

    return c;
}

当客户端向服务器发送命令请求的时候,客户端对应读事件就会就绪,而readQueryFromClient()处理器就会被调用:

// 每次文件事件循环可以尝试读取的最大次数
#define REPL_MAX_READS_PER_IO_EVENT 25
void readQueryFromClient(connection *conn) {
    // 从连接中获取客户端
    client *c = connGetPrivateData(conn);

    /* Check if we can send the client to be handled by the IO-thread */
    // 判断本次读取能否由IO线程来处理
    if (postponeClientRead(c)) return;

    // 如果状态不为IDLE,那么说明已经有读或写操作在进行中,直接返回
    if (c->io_write_state != CLIENT_IDLE || c->io_read_state != CLIENT_IDLE) return;

    bool repeat = false;
    int iter = 0;
    do {
        // 读取套接字
        bool full_read = readToQueryBuf(c);
        // 检查读取情况并处理可能出现的错误
        if (handleReadResult(c) == C_OK) {
            // 读取正常,那么……
            // 对客户端查询缓冲区进行解析,并执行解析所得的命令
            if (processInputBuffer(c) == C_ERR) return;
            // 释放已解析命令队列中未使用的内存(仅在命令已经执行完毕的情况下进行)
            trimCommandQueue(c);
        }
        // 在读取次数不超过限制次数的情况下,尽可能地多读取客户端传来的数据
        repeat = (c->flag.primary &&
                  !c->flag.close_asap &&
                  ++iter < REPL_MAX_READS_PER_IO_EVENT &&
                  full_read);
        // 执行那些需要在转而处理下个客户端之前,需要对当前客户端执行的操作
        beforeNextClient(c);
    } while (repeat);
}

readToQueryBuf()函数的主要工作如下:

  1. 调用networking.c/readToQueryBuf()函数,从客户端套接字中读取数据,并将其存储至客户端的查询缓冲区client->querybuf中。

  2. 调用networking.c/processInputBuffer()函数,解析客户端查询缓冲区中的命令请求,然后执行这些命令。

processInputBuffer()函数的主要定义如下:

int processInputBuffer(client *c) {

    /* Parse the query buffer and/or execute already parsed commands. */
    // 解析查询缓冲区并/或执行已解析命令
    while ((c->querybuf && c->qb_pos < sdslen(c->querybuf)) ||
           c->cmd_queue.off < c->cmd_queue.len) {

        // ...

        // 尝试从命令队列中弹出首个命令作为当前命令
        if (!consumeCommandQueue(c)) {
            // 如果队列中没有可供弹出的命令,那么解析输入缓冲区以获取可执行命令
            parseInputBuffer(c);
            // 预处理当前命令
            prepareCommandQueue(c);
        }

        // ...

        /* We are finally ready to execute the command. */
        // 正式开始处理和执行命令
        if (processCommandAndResetClient(c) == C_ERR) {
            /* If the client is no longer valid, we avoid exiting this
             * loop and trimming the client buffer later. So we return
             * ASAP in that case. */
            // 当客户端不再有效时,立即退出此循环,并在稍后修剪客户端缓冲区
            return C_ERR;
        }
    }

    return C_OK;
}

processInputBuffer()函数的定义中,它首先会调用networking.c/parseInputBuffer()函数以尝试解析查询缓冲区中的数据,并将待执行的命令请求放到客户端的命令队列中。

接着,processInputBuffer()函数会调用server.c/prepareCommandQueue()函数,对客户端命令队列中待执行的命令进行预处理,确保它们可以进入被执行的状态。

最后,processInputBuffer()函数会调用networking.c/processCommandAndResetClient()函数,正式开始处理和执行命令队列中的命令。

processCommandAndResetClient()函数要做的事情不多,它最重要的工作就是调用server.c/processCommand()函数以执行客户端的当前命令,这之后的工作就跟我们在之前的系列文章中看到过的Valkey命令的执行过程一样了:

int processCommandAndResetClient(client *c) {
    int deadclient = 0;
    client *old_client = server.current_client;
    server.current_client = c;

    // 执行命令
    if (processCommand(c) == C_OK) {
        // 执行一些命令执行之后的后续操作,比如:清空客户端、更新复制偏移量、传播命令等
        commandProcessed(c);
        /* Update the client's memory to include output buffer growth following the
         * processed command. */
        // 更新客户端内存信息
        if (c->conn) updateClientMemUsageAndBucket(c);
    }

    // ...
}

以下是server.h/client结构中与查询缓冲区及命令执行有关的主要属性:

typedef struct client {

    // ...

    // 查询缓冲区
    sds querybuf;        /* Buffer we use to accumulate client queries. */

    // 已读取查询缓冲区的偏移量
    size_t qb_pos;       /* The position we have read in querybuf. */

    // (当前正在执行命令的)命令参数
    robj **argv;         /* Arguments of current command. */

    // (当前正在执行命令的)命令参数数量
    int argc;            /* Num of arguments of current command. */

    // 命令参数数组的长度(可能大于实际的参数数量)
    int argv_len;        /* Size of argv array (may be more than argc) */

    // 已解析命令队列
    cmdQueue cmd_queue;  /* Parsed commands queue */

    // 正在执行的命令
    struct serverCommand *cmd;        /* Current command. */

    // ...

} client;
黄健宏
2026.1.26