Redis Streams(流)数据结构简介

Redis 的 Streams (流)是即将要添加到 Redis 4.x 版本的一种新的数据结构, 该结构是为了抹平 List 、SortedSet 和 Pub/Sub 之间的空隙, 以便更好地实现时间序列、消息的流式传递等应用而出现的。

目前来说, 用户只需要获取 https://github.com/antirez/redisstreams 分支, 就可以看到流结构的具体实现代码。

在 Redis 中, 所有流命令都是以 X 为开头的, 比如 XADDXRANGEXREAD , 本文接下来将对这三个元素进行介绍。

向流中添加新元素

向流中添加信息可以通过使用 XADD 命令来完成。

比如以下命令就向流 user-timeline 添加了一个拥有两个字段的元素, 这两个字段为 usermessage , 它们的值分别为 huangzhello world!

> XADD user-timeline * user "huangz" message "hello world!"
1508829813213.0

XADD 命令的第二个参数用于设置元素的 ID , 使用 * 为值则表示让 Redis 自行为该元素设置 ID 。 对于上面执行的 XADD 命令来说, 命令返回的 1508829813213.0 就是新添加元素的 ID 。

每个流元素的 ID 都可以由两个 64 位无符号整数组成。 对于 Redis 自动生成的 ID 来说, 点号 . 之前的数字为添加元素时的毫秒精度的时间戳, 而点号之后的数字则是一个从 0 开始自增的数字, 用于却别同一毫秒内出现的多个不同元素。

从流中获取指定 ID 范围内的元素

通过使用 XRANGE 命令, 用户可以获取指定 ID 范围内的元素。 跟有序集合命令一样, ID 范围可以使用 - 或者 + 来表示负无穷和正无穷。

比如说, 通过执行以下命令, 我们可以获取流中 ID 大于等于 1508829813213.0 的所有元素:

> XRANGE user-timeline 1508829813213.0 +
1) 1) 1508829813213.0
2) 1) "user"
   2) "huangz"
   3) "message"
   4) "hello world!"

以阻塞方式获取流中元素

跟列表一样, 流也支持阻塞式的获取操作。

比如说, 通过执行以下命令:

XREAD BLOCK 5000 STREAMS user-timeline $

用户可以在一个时间最长为 5000 毫秒的阻塞操作中, 等待 user-timeline 流最新添加的一个元素: 如果 user-timeline 流在指定时间内都没有出现新添加的元素, 那么 XREAD 命令将返回一个空值; 否则, XREAD 命令将返回流新添加的元素。

结语

除了上述提到的功能之外, 流还支持定长功能(限制流最多只能包含指定数量的元素), 并且将来还会支持组发送功能(类似于Pub/Sub,将一个元素分发给指定的一组客户端), 有兴趣的朋友可以关注 Redis 的 Streams 分支和作者的博客, 我也会继续和大家分享更多这方面的知识。

黄健宏
2017.10.24