《Redis应用实例》书摘(23):消息队列

消息队列是一种非常重要的数据结构,它既可以用于内部组件,也可以用于外部应用。

代码清单 CODE_MESSAGE_QUEUE 展示了基于流实现的消息队列程序。


代码清单 CODE_MESSAGE_QUEUE 消息队列程序 message_queue.py

from xread_iterator import StreamIterator, START_OF_STREAM

NON_BLOCK = None
BLOCK_FOREVER = 0

DEFAULT_COUNT = 10

START_OF_MQ = START_OF_STREAM

class MessageQueue:

    def __init__(self, client, key, cursor=START_OF_MQ):
        """
        根据给定的键,创建与之对应的消息队列。
        消息队列的起始访问位置可以通过可选参数cursor指定。
        默认情况下,cursor指向消息队列的最开头。
        """
        self.client = client
        self.key = key
        self._iterator = StreamIterator(self.client, self.key, cursor)

    def send(self, message):
        """
        接受一个键值对形式的消息,并将其放入队列。
        完成之后返回消息在队列中的ID作为结果。
        """
        return self.client.xadd(self.key, message)

    def receive(self, count=DEFAULT_COUNT, timeout=NON_BLOCK):
        """
        根据消息的入队顺序,访问队列中的消息。
        可选的count参数用于指定每次访问最多能够获取多少条消息,默认为10。
        可选的timeout参数用于指定方法在未发现消息时是否阻塞,它的值可以是:
        - NON_BLOCK,不阻塞直接返回,这是默认值;
        - BLOCK_FOREVER,一直阻塞直到有消息可读为止;
        - 一个大于零的整数,用于代表阻塞的最大毫秒数
        """
        return self._iterator.next(count, timeout)

    def get(self, message_id):
        """
        获取指定ID对应的消息。
        """
        ret = self.client.xrange(self.key, message_id, message_id)
        if ret != []:
            return ret[0][1]

    def length(self):
        """
        返回整个消息队列目前包含的消息总数量。
        """
        return self.client.xlen(self.key)
DEFAULT_COUNT = 10
NON_BLOCK = None
START_OF_STREAM = 0

def tuple_to_dict(tpl):
    """
    将流返回的元素从元组(id, msg)转换为字典{"id":id, "msg":msg}。
    """
    return {"id": tpl[0], "msg": tpl[1]}

class StreamIterator:

    def __init__(self, client, key, cursor=START_OF_STREAM):
        """
        初始化流迭代器,参数key用于指定被迭代的流。
        可选的cursor参数用于指定迭代的游标,默认为流的开头。
        """
        self.client = client
        self.key = key
        self._cursor = cursor

    def next(self,count=DEFAULT_COUNT,block=NON_BLOCK):
        """
        迭代流元素并以列表形式返回它们,其中每个元素的格式为{"id":id, "msg":msg}。
        可选的count参数用于指定每次迭代能够返回的最大元素数量,默认为10。
        可选的block参数用于指定迭代时阻塞的最长时限,单位为毫秒,默认非阻塞。
        """
        ret = self.client.xread({self.key: self._cursor}, count=count, block=block)
        if ret == []:
            return []
        else:
            messages = ret[0][1]
            self._cursor = messages[-1][0] # 本次迭代最后一条消息的ID
            return list(map(tuple_to_dict, messages))

    def rewind(self, cursor=START_OF_STREAM):
        """
        将游标重置至可选参数cursor指定的位置,若省略该参数则默认重置至流的开头。
        """
        self._cursor = cursor

这个程序复用了《流迭代器》一章中使用XREAD命令实现的流迭代器。通过这个迭代器,程序成功地将原本非常复杂的消息接收操作简化为一次针对StreamIterator.next()方法的调用,这也是这个消息队列程序能够保持简练的关键。

作为例子,以下代码展示了这个消息队列程序的具体使用方式:

>>> from redis import Redis
>>> from message_queue import MessageQueue
>>> client = Redis(decode_responses=True)
>>> mq = MessageQueue(client, "MessageQueue:10086")  # 创建消息队列对象
>>> mq.send({"uid": "Jack", "msg": "Hello!"})  # 发送消息
'1720847899374-0'
>>> mq.send({"uid": "Tom", "msg": "Hi!"})
'1720847912318-0'
>>> mq.receive()  # 接收消息
[{'id': '1720847899374-0', 'msg': {'uid': 'Jack', 'msg': 'Hello!'}},
 {'id': '1720847912318-0', 'msg': {'uid': 'Tom', 'msg': 'Hi!'}}]
>>> mq.get("1720847899374-0")  # 获取特定消息
{'uid': 'Jack', 'msg': 'Hello!'}
>>> mq.length()  # 获取消息总数量
2

Tip

本文摘录自《Redis应用实例》一书。
欢迎访问书本主页以了解更多Redis使用案例:huangz.works/rediscookbook/
../_images/rediscookbook-banner.png