Valkey源码剖析(22):命令回复的存储

客户端使用server.h/client结构中的buf属性和reply属性来保存被执行命令的回复:

typedef struct client {

    // ...

    // 输出缓冲区
    char *buf;                           /* Output buffer */
    // 输出缓冲区的可用大小
    size_t buf_usable_size;              /* Usable size of buffer. */
    // 需要发送至客户端的一连串回复对象
    list *reply;                         /* List of reply objects to send to the client. */

    // ...

    // 输出缓冲区偏移量
    size_t bufpos;

    // ...

} client;

服务器在执行完一个命令之后,通常会调用一个名字以addReply开头的函数,将命令产生的回复写入至客户端缓冲区中:

  • 如果回复很简短,那么服务器会直接将回复写入至客户端的client->buf缓冲区;

  • 但如果回复比较长,仅用client->buf无法容纳所有回复内容,那么服务器将把回复分割为多个部分,然后每个部分使用一个server.h/clientReplyBlock结构存储,最后再用链表client->reply把多个块结构串联起来形成完整的回复。

以下是clientReplyBlock结构的定义:

typedef struct clientReplyBlock {
    // 缓冲区的总体积以及当前已占用大小
    size_t size, used;
    // 指向已编码缓冲区最后的首部
    payloadHeader *last_header; /* points to a last header in an encoded buffer */
    // 标识
    ClientReplyBlockFlags flag;
    // 内容
    char buf[];
} clientReplyBlock;

addReply系列函数中最简单的addReply()函数为例:

void addReply(client *c, robj *obj) {
    // 检查写入能否进行
    if (prepareClientToWrite(c) != C_OK) return;

    if (sdsEncodedObject(obj)) {
        // 如果SDS存储的是字符串(RAW编码或EMB编码),那么直接将其写入输出缓冲区
        _addReplyToBufferOrList(c, obj->ptr, sdslen(obj->ptr));
    } else if (obj->encoding == OBJ_ENCODING_INT) {
        /* For integer encoded strings we just convert it into a string
         * using our optimized function, and attach the resulting string
         * to the output buffer. */
        // 如果SDS存储的是数字,那么将其转换为字符串然后再写入输出缓冲区
        char buf[32];
        size_t len = ll2string(buf, sizeof(buf), (long)obj->ptr);
        _addReplyToBufferOrList(c, buf, len);
    } else {
        serverPanic("Wrong obj->encoding in addReply()");
    }
}

该函数会调用_addReplyToBufferOrList()函数:

void _addReplyToBufferOrList(client *c, const char *s, size_t len) {
    if (c->flag.close_after_reply) return;

    /* Replicas should normally not cause any writes to the reply buffer. In case a rogue replica sent a command on the
     * replication link that caused a reply to be generated we'll simply disconnect it.
     * Note this is the simplest way to check a command added a response. Replication links are used to write data but
     * not for responses, so we should normally never get here on a replica client. */
    if (getClientType(c) == CLIENT_TYPE_REPLICA) {
        sds cmdname = c->lastcmd ? c->lastcmd->fullname : NULL;
        logInvalidUseAndFreeClientAsync(c, "Replica generated a reply to command '%s'",
                                        cmdname ? cmdname : "<unknown>");
        return;
    }

    // 更新当前命令将要发送给客户端的网络字节数量
    c->net_output_bytes_curr_cmd += len;

    /* We call it here because this function may affect the reply
     * buffer offset (see function comment) */
    reqresSaveClientReplyOffset(c);

    /* If we're processing a push message into the current client (i.e. executing PUBLISH
     * to a channel which we are subscribed to, then we wanna postpone that message to be added
     * after the command's reply (specifically important during multi-exec). the exception is
     * the SUBSCRIBE command family, which (currently) have a push message instead of a proper reply.
     * The check for executing_client also avoids affecting push messages that are part of eviction.
     * Check CLIENT_PUSHING first to avoid race conditions, as it's absent in module's fake client. */
    int defer_push_message = c->flag.pushing && c == server.current_client && server.executing_client &&
                             !cmdHasPushAsReply(server.executing_client->cmd);
    if (defer_push_message == 0 && isDeferredReplyEnabled(c)) {
        _addReplyProtoToList(c, c->deferred_reply, s, len);
        return;
    }

    if (defer_push_message) {
        _addReplyProtoToList(c, server.pending_push_messages, s, len);
        return;
    }

    // 将回复写入客户端静态缓冲区
    size_t reply_len = _addReplyToBuffer(c, s, len);
    // 将回复写入客户端输出缓冲区链表
    if (len > reply_len) _addReplyProtoToList(c, c->reply, s + reply_len, len - reply_len);
}

在这个函数的最后,首先被调用的_addReplyToBuffer()函数会尝试将回复写入至client->buf中;如果在这之后,回复还有剩余的内容(这些内容无法容纳在静态缓冲区中),那么接下来调用的_addReplyProtoToList()就会将剩余的内容写入至由clientReplyBlock和链表组成的回复链表中。

黄健宏
2026.1.26