《Redis应用实例》书摘(23):消息队列¶
消息队列是一种非常重要的数据结构,它既可以用于内部组件,也可以用于外部应用。
代码清单 CODE_MESSAGE_QUEUE 展示了基于流实现的消息队列程序。
代码清单 CODE_MESSAGE_QUEUE 消息队列程序 message_queue.py
from xread_iterator import StreamIterator, START_OF_STREAM
NON_BLOCK = None
BLOCK_FOREVER = 0
DEFAULT_COUNT = 10
START_OF_MQ = START_OF_STREAM
class MessageQueue:
def __init__(self, client, key, cursor=START_OF_MQ):
"""
根据给定的键,创建与之对应的消息队列。
消息队列的起始访问位置可以通过可选参数cursor指定。
默认情况下,cursor指向消息队列的最开头。
"""
self.client = client
self.key = key
self._iterator = StreamIterator(self.client, self.key, cursor)
def send(self, message):
"""
接受一个键值对形式的消息,并将其放入队列。
完成之后返回消息在队列中的ID作为结果。
"""
return self.client.xadd(self.key, message)
def receive(self, count=DEFAULT_COUNT, timeout=NON_BLOCK):
"""
根据消息的入队顺序,访问队列中的消息。
可选的count参数用于指定每次访问最多能够获取多少条消息,默认为10。
可选的timeout参数用于指定方法在未发现消息时是否阻塞,它的值可以是:
- NON_BLOCK,不阻塞直接返回,这是默认值;
- BLOCK_FOREVER,一直阻塞直到有消息可读为止;
- 一个大于零的整数,用于代表阻塞的最大毫秒数
"""
return self._iterator.next(count, timeout)
def get(self, message_id):
"""
获取指定ID对应的消息。
"""
ret = self.client.xrange(self.key, message_id, message_id)
if ret != []:
return ret[0][1]
def length(self):
"""
返回整个消息队列目前包含的消息总数量。
"""
return self.client.xlen(self.key)
DEFAULT_COUNT = 10
NON_BLOCK = None
START_OF_STREAM = 0
def tuple_to_dict(tpl):
"""
将流返回的元素从元组(id, msg)转换为字典{"id":id, "msg":msg}。
"""
return {"id": tpl[0], "msg": tpl[1]}
class StreamIterator:
def __init__(self, client, key, cursor=START_OF_STREAM):
"""
初始化流迭代器,参数key用于指定被迭代的流。
可选的cursor参数用于指定迭代的游标,默认为流的开头。
"""
self.client = client
self.key = key
self._cursor = cursor
def next(self,count=DEFAULT_COUNT,block=NON_BLOCK):
"""
迭代流元素并以列表形式返回它们,其中每个元素的格式为{"id":id, "msg":msg}。
可选的count参数用于指定每次迭代能够返回的最大元素数量,默认为10。
可选的block参数用于指定迭代时阻塞的最长时限,单位为毫秒,默认非阻塞。
"""
ret = self.client.xread({self.key: self._cursor}, count=count, block=block)
if ret == []:
return []
else:
messages = ret[0][1]
self._cursor = messages[-1][0] # 本次迭代最后一条消息的ID
return list(map(tuple_to_dict, messages))
def rewind(self, cursor=START_OF_STREAM):
"""
将游标重置至可选参数cursor指定的位置,若省略该参数则默认重置至流的开头。
"""
self._cursor = cursor
这个程序复用了《流迭代器》一章中使用XREAD命令实现的流迭代器。通过这个迭代器,程序成功地将原本非常复杂的消息接收操作简化为一次针对StreamIterator.next()方法的调用,这也是这个消息队列程序能够保持简练的关键。
作为例子,以下代码展示了这个消息队列程序的具体使用方式:
>>> from redis import Redis
>>> from message_queue import MessageQueue
>>> client = Redis(decode_responses=True)
>>> mq = MessageQueue(client, "MessageQueue:10086") # 创建消息队列对象
>>> mq.send({"uid": "Jack", "msg": "Hello!"}) # 发送消息
'1720847899374-0'
>>> mq.send({"uid": "Tom", "msg": "Hi!"})
'1720847912318-0'
>>> mq.receive() # 接收消息
[{'id': '1720847899374-0', 'msg': {'uid': 'Jack', 'msg': 'Hello!'}},
{'id': '1720847912318-0', 'msg': {'uid': 'Tom', 'msg': 'Hi!'}}]
>>> mq.get("1720847899374-0") # 获取特定消息
{'uid': 'Jack', 'msg': 'Hello!'}
>>> mq.length() # 获取消息总数量
2