Valkey源码剖析(22):命令回复的存储¶
客户端使用server.h/client结构中的buf属性和reply属性来保存被执行命令的回复:
typedef struct client {
// ...
// 输出缓冲区
char *buf; /* Output buffer */
// 输出缓冲区的可用大小
size_t buf_usable_size; /* Usable size of buffer. */
// 需要发送至客户端的一连串回复对象
list *reply; /* List of reply objects to send to the client. */
// ...
// 输出缓冲区偏移量
size_t bufpos;
// ...
} client;
服务器在执行完一个命令之后,通常会调用一个名字以addReply开头的函数,将命令产生的回复写入至客户端缓冲区中:
如果回复很简短,那么服务器会直接将回复写入至客户端的
client->buf缓冲区;但如果回复比较长,仅用
client->buf无法容纳所有回复内容,那么服务器将把回复分割为多个部分,然后每个部分使用一个server.h/clientReplyBlock结构存储,最后再用链表client->reply把多个块结构串联起来形成完整的回复。
以下是clientReplyBlock结构的定义:
typedef struct clientReplyBlock {
// 缓冲区的总体积以及当前已占用大小
size_t size, used;
// 指向已编码缓冲区最后的首部
payloadHeader *last_header; /* points to a last header in an encoded buffer */
// 标识
ClientReplyBlockFlags flag;
// 内容
char buf[];
} clientReplyBlock;
以addReply系列函数中最简单的addReply()函数为例:
void addReply(client *c, robj *obj) {
// 检查写入能否进行
if (prepareClientToWrite(c) != C_OK) return;
if (sdsEncodedObject(obj)) {
// 如果SDS存储的是字符串(RAW编码或EMB编码),那么直接将其写入输出缓冲区
_addReplyToBufferOrList(c, obj->ptr, sdslen(obj->ptr));
} else if (obj->encoding == OBJ_ENCODING_INT) {
/* For integer encoded strings we just convert it into a string
* using our optimized function, and attach the resulting string
* to the output buffer. */
// 如果SDS存储的是数字,那么将其转换为字符串然后再写入输出缓冲区
char buf[32];
size_t len = ll2string(buf, sizeof(buf), (long)obj->ptr);
_addReplyToBufferOrList(c, buf, len);
} else {
serverPanic("Wrong obj->encoding in addReply()");
}
}
该函数会调用_addReplyToBufferOrList()函数:
void _addReplyToBufferOrList(client *c, const char *s, size_t len) {
if (c->flag.close_after_reply) return;
/* Replicas should normally not cause any writes to the reply buffer. In case a rogue replica sent a command on the
* replication link that caused a reply to be generated we'll simply disconnect it.
* Note this is the simplest way to check a command added a response. Replication links are used to write data but
* not for responses, so we should normally never get here on a replica client. */
if (getClientType(c) == CLIENT_TYPE_REPLICA) {
sds cmdname = c->lastcmd ? c->lastcmd->fullname : NULL;
logInvalidUseAndFreeClientAsync(c, "Replica generated a reply to command '%s'",
cmdname ? cmdname : "<unknown>");
return;
}
// 更新当前命令将要发送给客户端的网络字节数量
c->net_output_bytes_curr_cmd += len;
/* We call it here because this function may affect the reply
* buffer offset (see function comment) */
reqresSaveClientReplyOffset(c);
/* If we're processing a push message into the current client (i.e. executing PUBLISH
* to a channel which we are subscribed to, then we wanna postpone that message to be added
* after the command's reply (specifically important during multi-exec). the exception is
* the SUBSCRIBE command family, which (currently) have a push message instead of a proper reply.
* The check for executing_client also avoids affecting push messages that are part of eviction.
* Check CLIENT_PUSHING first to avoid race conditions, as it's absent in module's fake client. */
int defer_push_message = c->flag.pushing && c == server.current_client && server.executing_client &&
!cmdHasPushAsReply(server.executing_client->cmd);
if (defer_push_message == 0 && isDeferredReplyEnabled(c)) {
_addReplyProtoToList(c, c->deferred_reply, s, len);
return;
}
if (defer_push_message) {
_addReplyProtoToList(c, server.pending_push_messages, s, len);
return;
}
// 将回复写入客户端静态缓冲区
size_t reply_len = _addReplyToBuffer(c, s, len);
// 将回复写入客户端输出缓冲区链表
if (len > reply_len) _addReplyProtoToList(c, c->reply, s + reply_len, len - reply_len);
}
在这个函数的最后,首先被调用的_addReplyToBuffer()函数会尝试将回复写入至client->buf中;如果在这之后,回复还有剩余的内容(这些内容无法容纳在静态缓冲区中),那么接下来调用的_addReplyProtoToList()就会将剩余的内容写入至由clientReplyBlock和链表组成的回复链表中。
黄健宏
2026.1.26