Valkey源码剖析(23):命令回复的写入

Valkey服务器在调用addReply系列函数的时候,会调用networking.c/prepareClientToWrite()函数,而后者则会通过调用networking.c/putClientInPendingWriteQueue(),将客户端添加至服务器全局的待写入客户端链表中:

// 在每次尝试向客户端发送数据之前调用,检查各种条件以判断发送操作能否进行
// 可以的话返回C_OK,不行的话返回C_ERR
int prepareClientToWrite(client *c) {

    // ...

    /* Schedule the client to write the output buffers to the socket, unless
     * it should already be setup to do so (it has already pending data). */
    // 安排客户端将输出缓冲区写入至套接字,除非它之前已经设置好了(有待处理数据)
    if (!clientHasPendingReplies(c)) putClientInPendingWriteQueue(c);

    // ...
}
void putClientInPendingWriteQueue(client *c) {
    /* Schedule the client to write the output buffers to the socket only
     * if not already done and, for replicas, if the replica can actually receive
     * writes at this stage. */
    if (!c->flag.pending_write &&
        (!c->repl_data ||
         c->repl_data->repl_state == REPL_STATE_NONE ||
         (isReplicaReadyForReplData(c) && !c->repl_data->repl_start_cmd_stream_on_ack)) &&
        clusterSlotMigrationShouldInstallWriteHandler(c)) {
        /* Here instead of installing the write handler, we just flag the
         * client and put it into a list of clients that have something
         * to write to the socket. This way before re-entering the event
         * loop, we can try to directly write to the client sockets avoiding
         * a system call. We'll only really install the write handler if
         * we'll not be able to write the whole reply at once. */
        c->flag.pending_write = 1;
        listLinkNodeHead(server.clients_pending_write, &c->clients_pending_write_node);
    }
}

在服务器进入下一个事件循环之前,休眠前置函数server.c/beforeSleep()函数被调用的时候,networking.c/handleClientsWithPendingWrites()函数将会被调用:

void beforeSleep(struct aeEventLoop *eventLoop) {

    // ...

    // 只执行部分关键函数,从而避免在执行processEventsWhileBlocked()时重新进入事件循环。
    // 注意,在这种情况下,程序会记录正在处理的事件数量,
    // 以便processEventsWhileBlocked()函数在无事可处理的时候尽快停止。
    if (ProcessingEventsWhileBlocked) {
        uint64_t processed = 0;

        // ...

        // 尝试直接写入客户端的输出缓冲区
        processed += handleClientsWithPendingWrites();

        // ...
    }

    // ...

}

handleClientsWithPendingWrites()函数会遍历服务器待写入客户端链表server.clients_pending_write,尝试对客户端进行同步写入,只有在同步写入之后,客户端仍然有数据需要写入的情况下,服务器才会对客户端安装写入事件处理器server.c/installClientWriteHandler(),并在之后由事件子系统进行后续的异步写入:

// 在进入事件循环前调用,尝试在不使用系统调用以安装可写事件句柄等一系列操作的情况下,
// 直接将回复写入至客户端输出缓冲区。
int handleClientsWithPendingWrites(void) {
    int processed = 0;
    int pending_writes = listLength(server.clients_pending_write);
    if (pending_writes == 0) return processed; /* Return ASAP if there are no clients. */

    /* Adjust the number of I/O threads based on the number of pending writes this is required in case pending_writes >
     * poll_events (for example in pubsub) */
    adjustIOThreadsByEventLoad(pending_writes, 1);

    // 遍历服务器中的待写入客户端链表
    listIter li;
    listNode *ln;
    listRewind(server.clients_pending_write, &li);
    while ((ln = listNext(&li))) {
        // 获取待写入客户端
        client *c = listNodeValue(ln);
        c->flag.pending_write = 0;
        listUnlinkNode(server.clients_pending_write, ln);

        // ...

        // 更新写入计数器
        processed++;

        /* Try to write buffers to the client socket. */
        // 尝试将缓冲区数据写入至客户端套接字
        if (writeToClient(c) == C_ERR) continue;

        /* If after the synchronous writes above we still have data to
         * output to the client, we need to install the writable handler. */
        // 如果在上面的同步写入执行之后,服务器仍然有数据需要输出至客户端,
        // 那么安装写事件处理器,由它处理接下来的写入
        if (clientHasPendingReplies(c)) {
            installClientWriteHandler(c);
        }
    }

    // 返回本次写入的客户端数量
    return processed;
}

installClientWriteHandler()函数要做的就是安装server.c/sendReplyToClient()函数作为客户端连接的写事件处理器:

// 为客户端设置写事件处理函数
void installClientWriteHandler(client *c) {
    int ae_barrier = 0;
    /* For the fsync=always policy, we want that a given FD is never
     * served for reading and writing in the same event loop iteration,
     * so that in the middle of receiving the query, and serving it
     * to the client, we'll call beforeSleep() that will do the
     * actual fsync of AOF to disk. the write barrier ensures that. */
    // 当fsync选项被设置为always时,开启ae_barrier,
    // 以便在向客户端返回回复之前,将AOF数据追加至硬盘
    if (server.aof_state == AOF_ON && server.aof_fsync == AOF_FSYNC_ALWAYS) {
        ae_barrier = 1;
    }
    if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_barrier) == C_ERR) {
        freeClientAsync(c);
    }
}

这样当客户端连接的写事件就绪的时候,事件子系统就会执行server.c/sendReplyToClient()函数以执行写事件,而后者要做的就是调用相应的写入函数,把命令回复缓冲区或命令回复链表中的数据写入至客户端对应的套接字中:

// 写入事件处理器。
// 这个函数要做的就是把数据发送至客户端。
void sendReplyToClient(connection *conn) {
    // 根据连接获取客户端
    client *c = connGetPrivateData(conn);
    // 先尝试使用IO线程进行写入
    if (trySendWriteToIOThreads(c) == C_OK) return;
    // 如果没有启用IO线程的话,那么再由主线程进行常规的写入操作
    writeToClient(c);
}
黄健宏
2026.1.26