《Redis应用实例》书摘(18):流迭代器(使用XRANGE实现)¶
虽然Redis提供了SCAN、HSCAN、SSCAN和ZSCAN命令来分别为数据库、散列、集合和有序集合提供迭代功能,但它并未为流数据结构提供对应的迭代命令。
幸运的是,通过Redis目前提供的XREAD命令或是XRANGE命令,我们可以模仿并实现与上述命令类似的流元素迭代器。
代码清单 CODE_XRANGE_ITERATOR 展示了使用XRANGE命令实现的流迭代器实现。
代码清单 CODE_XRANGE_ITERATOR 使用XRANGE命令实现的流迭代器 xrange_iterator.py
DEFAULT_COUNT = 10
START_OF_STREAM = "-"
END_OF_STREAM = "+"
def tuple_to_dict(tpl):
"""
将流返回的元素从元组(id, msg)转换为字典{"id":id, "msg":msg}。
"""
return {"id": tpl[0], "msg": tpl[1]}
class StreamIterator:
def __init__(self, client, key, cursor=START_OF_STREAM):
"""
初始化流迭代器,参数key用于指定被迭代的流。
可选的cursor参数用于指定迭代的游标,默认为流的开头。
"""
self.client = client
self.key = key
self._cursor = cursor
def next(self,count=DEFAULT_COUNT):
"""
迭代流元素并以列表形式返回它们,其中每个元素的格式为{"id":id, "msg":msg}。
可选的count参数用于指定每次迭代能够返回的最大元素数量,默认为10。
"""
messages = self.client.xrange(self.key, self._cursor, END_OF_STREAM, count=count)
if messages == []:
return []
else:
# 获取本次迭代最后一条消息的ID
# 并通过给它加上前缀"("来保证下次迭代时新消息的ID必定大于它
self._cursor = "(" + messages[-1][0]
return list(map(tuple_to_dict, messages))
def rewind(self, cursor=START_OF_STREAM):
"""
将游标重置至可选参数cursor指定的位置,若省略该参数则默认重置至流的开头。
"""
self._cursor = cursor
作为例子,以下代码展示了如何使用这个迭代器来迭代上面提到的"stream"流:
>>> from redis import Redis
>>> from xrange_iterator import StreamIterator
>>> client = Redis(decode_responses=True)
>>> iterator = StreamIterator(client, "stream")
>>> iterator.next(2) # 每次迭代最多两个元素
[{'id': '10086-0', 'msg': {'': ''}}, {'id': '10087-0', 'msg': {'': ''}}]
>>> iterator.next(2)
[{'id': '10088-0', 'msg': {'': ''}}, {'id': '10089-0', 'msg': {'': ''}}]
>>> iterator.next(2)
[{'id': '10090-0', 'msg': {'': ''}}] # 迭代完毕
>>> iterator.next(2)
[]