《Redis应用实例》书摘(18):流迭代器(使用XRANGE实现)

虽然Redis提供了SCANHSCANSSCANZSCAN命令来分别为数据库、散列、集合和有序集合提供迭代功能,但它并未为流数据结构提供对应的迭代命令。

幸运的是,通过Redis目前提供的XREAD命令或是XRANGE命令,我们可以模仿并实现与上述命令类似的流元素迭代器。

代码清单 CODE_XRANGE_ITERATOR 展示了使用XRANGE命令实现的流迭代器实现。


代码清单 CODE_XRANGE_ITERATOR 使用XRANGE命令实现的流迭代器 xrange_iterator.py

DEFAULT_COUNT = 10

START_OF_STREAM = "-"
END_OF_STREAM = "+"

def tuple_to_dict(tpl):
    """
    将流返回的元素从元组(id, msg)转换为字典{"id":id, "msg":msg}。
    """
    return {"id": tpl[0], "msg": tpl[1]}

class StreamIterator:

    def __init__(self, client, key, cursor=START_OF_STREAM):
        """
        初始化流迭代器,参数key用于指定被迭代的流。
        可选的cursor参数用于指定迭代的游标,默认为流的开头。
        """
        self.client = client
        self.key = key
        self._cursor = cursor 

    def next(self,count=DEFAULT_COUNT):
        """
        迭代流元素并以列表形式返回它们,其中每个元素的格式为{"id":id, "msg":msg}。
        可选的count参数用于指定每次迭代能够返回的最大元素数量,默认为10。
        """
        messages = self.client.xrange(self.key, self._cursor, END_OF_STREAM, count=count)
        if messages == []:
            return []
        else:
            # 获取本次迭代最后一条消息的ID
            # 并通过给它加上前缀"("来保证下次迭代时新消息的ID必定大于它
            self._cursor = "(" + messages[-1][0]
            return list(map(tuple_to_dict, messages))

    def rewind(self, cursor=START_OF_STREAM):
        """
        将游标重置至可选参数cursor指定的位置,若省略该参数则默认重置至流的开头。
        """
        self._cursor = cursor

作为例子,以下代码展示了如何使用这个迭代器来迭代上面提到的"stream"流:

>>> from redis import Redis
>>> from xrange_iterator import StreamIterator
>>> client = Redis(decode_responses=True)
>>> iterator = StreamIterator(client, "stream")
>>> iterator.next(2)  # 每次迭代最多两个元素
[{'id': '10086-0', 'msg': {'': ''}}, {'id': '10087-0', 'msg': {'': ''}}]
>>> iterator.next(2)
[{'id': '10088-0', 'msg': {'': ''}}, {'id': '10089-0', 'msg': {'': ''}}]
>>> iterator.next(2)
[{'id': '10090-0', 'msg': {'': ''}}]  # 迭代完毕
>>> iterator.next(2)
[]

Tip

本文摘录自《Redis应用实例》一书。
欢迎访问书本主页以了解更多Redis使用案例:huangz.works/rediscookbook/
../_images/rediscookbook-banner.png