《Redis应用实例》书摘(15):资源池

计算机系统中存在各式各样不同的资源:在硬件层面,资源可以是CPU时间、内存容量、硬盘空间或带宽等,而在软件层面,资源可以是文件、线程、进程又或者负责处理任务的客户端等。

如何管理资源一直是计算机系统最常遇到的问题之一,解决这个问题的其中一种常见方法,就是将同类的资源都关联到一个资源池中,并在需要使用资源时向资源池发送请求以获取资源,最后再在资源使用完毕之后将其归还至资源池。

考虑到资源池实际上就是一个记录资源的集合,所以我们使用Redis集合来实现它:其中将资源关联(或者说添加)到资源池的工作可以通过SADD命令来实现,而从资源池中取消关联(或者说移除)资源的工作则可以通过SREM命令来实现。

代码清单 CODE_RESOURCE_POOL 展示了基于上述原理实现的资源池程序。


代码清单 CODE_RESOURCE_POOL 资源池程序 resource_pool.py

from redis import WatchError

def available_key(pool_name):
    return "ResourcePool:{0}:available".format(pool_name)

def occupied_key(pool_name):
    return "ResourcePool:{0}:occupied".format(pool_name)

class ResourcePool:

    def __init__(self, client, pool_name):
        """
        基于给定的资源池名字创建出相应的资源池对象。
        """
        self.client = client
        self.available_set = available_key(pool_name)
        self.occupied_set = occupied_key(pool_name)

    def associate(self, resource):
        """
        将指定资源关联到资源池中。
        返回True表示关联成功,返回False表示资源已存在,关联失败。
        返回None则表示关联过程中操作失败,需要重新尝试。
        """
        tx = self.client.pipeline()
        try:
            # 监视两个集合,观察它们是否在操作中途变化
            tx.watch(self.available_set, self.occupied_set)
            # 检查给定资源是否存在于两个集合之中
            if tx.sismember(self.available_set, resource) or \
               tx.sismember(self.occupied_set, resource):
                # 资源已存在,放弃添加
                tx.unwatch()
                return False
            else:
                # 资源未存在,尝试添加
                tx.multi()
                tx.sadd(self.available_set, resource)
                return tx.execute()[0]==1  # 添加是否成功?
        except WatchError:
            # 操作过程中集合键发生了变化,需要重试
            pass
        finally:
            tx.reset()

    def disassociate(self, resource):
        """
        将指定资源从资源池中移除。
        移除成功返回True,因资源不存在而导致移除失败返回False。
        """
        # 使用事务同时向两个集合发出SREM命令
        # 当资源存在于池中时,其中一个集合将返回1作为SREM命令的结果
        tx = self.client.pipeline()
        tx.srem(self.available_set, resource)
        tx.srem(self.occupied_set, resource)
        ret1, ret2 = tx.execute()
        return ret1==1 or ret2==1

    def acquire(self):
        """
        尝试从资源池中获取并返回可用的资源。
        返回None表示资源池为空,或者获取操作过程中失败。
        """
        tx = self.client.pipeline()
        try:
            # 监视两个集合,观察它们是否在操作中途变化
            tx.watch(self.available_set, self.occupied_set)
            # 尝试从可用集合中随机获取一项资源
            resource = tx.srandmember(self.available_set)
            if resource is not None:
                # 将资源从可用集合移动到已占用集合
                tx.multi()
                tx.smove(self.available_set, self.occupied_set, resource)
                smove_ret = tx.execute()[0]
                if smove_ret == 1:
                    return resource
        except WatchError:
            # 操作过程中集合键发生了变化,需要重试
            pass
        finally:
            tx.reset()

    def release(self, resource):
        """
        将给定的一项已被占用的资源回归至资源池。
        回归成功时返回True,因资源不属于资源池而导致回归失败时返回False。
        """
        # 将资源从已占用集合移动至可用集合
        return self.client.smove(self.occupied_set, self.available_set, resource)

作为例子,以下代码展示了如何使用这个资源池程序的各项功能:

>>> from redis import Redis
>>> from resource_pool import ResourcePool
>>> client = Redis(decode_responses=True)
>>> pool = ResourcePool(client, "Workers")
>>> for i in range(1, 6):  # 在资源池中关联5个工作进程
...   pool.associate("Worker{}".format(i))
...
True
# ...
True
>>> pool.acquire()  # 获取3个工作进程
'Worker4'
>>> pool.acquire()
'Worker2'
>>> pool.acquire()
'Worker3'
>>> pool.release("Worker3")  # 释放1个工作进程
True
>>> pool.disassociate("Worker1")  # 解除对Worker1的关联
True

Tip

本文摘录自《Redis应用实例》一书。
欢迎访问书本主页以了解更多Redis使用案例:huangz.works/rediscookbook/
../_images/rediscookbook-banner.png