《Redis应用实例》书摘(15):资源池¶
计算机系统中存在各式各样不同的资源:在硬件层面,资源可以是CPU时间、内存容量、硬盘空间或带宽等,而在软件层面,资源可以是文件、线程、进程又或者负责处理任务的客户端等。
如何管理资源一直是计算机系统最常遇到的问题之一,解决这个问题的其中一种常见方法,就是将同类的资源都关联到一个资源池中,并在需要使用资源时向资源池发送请求以获取资源,最后再在资源使用完毕之后将其归还至资源池。
考虑到资源池实际上就是一个记录资源的集合,所以我们使用Redis集合来实现它:其中将资源关联(或者说添加)到资源池的工作可以通过SADD命令来实现,而从资源池中取消关联(或者说移除)资源的工作则可以通过SREM命令来实现。
代码清单 CODE_RESOURCE_POOL 展示了基于上述原理实现的资源池程序。
代码清单 CODE_RESOURCE_POOL 资源池程序 resource_pool.py
from redis import WatchError
def available_key(pool_name):
return "ResourcePool:{0}:available".format(pool_name)
def occupied_key(pool_name):
return "ResourcePool:{0}:occupied".format(pool_name)
class ResourcePool:
def __init__(self, client, pool_name):
"""
基于给定的资源池名字创建出相应的资源池对象。
"""
self.client = client
self.available_set = available_key(pool_name)
self.occupied_set = occupied_key(pool_name)
def associate(self, resource):
"""
将指定资源关联到资源池中。
返回True表示关联成功,返回False表示资源已存在,关联失败。
返回None则表示关联过程中操作失败,需要重新尝试。
"""
tx = self.client.pipeline()
try:
# 监视两个集合,观察它们是否在操作中途变化
tx.watch(self.available_set, self.occupied_set)
# 检查给定资源是否存在于两个集合之中
if tx.sismember(self.available_set, resource) or \
tx.sismember(self.occupied_set, resource):
# 资源已存在,放弃添加
tx.unwatch()
return False
else:
# 资源未存在,尝试添加
tx.multi()
tx.sadd(self.available_set, resource)
return tx.execute()[0]==1 # 添加是否成功?
except WatchError:
# 操作过程中集合键发生了变化,需要重试
pass
finally:
tx.reset()
def disassociate(self, resource):
"""
将指定资源从资源池中移除。
移除成功返回True,因资源不存在而导致移除失败返回False。
"""
# 使用事务同时向两个集合发出SREM命令
# 当资源存在于池中时,其中一个集合将返回1作为SREM命令的结果
tx = self.client.pipeline()
tx.srem(self.available_set, resource)
tx.srem(self.occupied_set, resource)
ret1, ret2 = tx.execute()
return ret1==1 or ret2==1
def acquire(self):
"""
尝试从资源池中获取并返回可用的资源。
返回None表示资源池为空,或者获取操作过程中失败。
"""
tx = self.client.pipeline()
try:
# 监视两个集合,观察它们是否在操作中途变化
tx.watch(self.available_set, self.occupied_set)
# 尝试从可用集合中随机获取一项资源
resource = tx.srandmember(self.available_set)
if resource is not None:
# 将资源从可用集合移动到已占用集合
tx.multi()
tx.smove(self.available_set, self.occupied_set, resource)
smove_ret = tx.execute()[0]
if smove_ret == 1:
return resource
except WatchError:
# 操作过程中集合键发生了变化,需要重试
pass
finally:
tx.reset()
def release(self, resource):
"""
将给定的一项已被占用的资源回归至资源池。
回归成功时返回True,因资源不属于资源池而导致回归失败时返回False。
"""
# 将资源从已占用集合移动至可用集合
return self.client.smove(self.occupied_set, self.available_set, resource)
作为例子,以下代码展示了如何使用这个资源池程序的各项功能:
>>> from redis import Redis
>>> from resource_pool import ResourcePool
>>> client = Redis(decode_responses=True)
>>> pool = ResourcePool(client, "Workers")
>>> for i in range(1, 6): # 在资源池中关联5个工作进程
... pool.associate("Worker{}".format(i))
...
True
# ...
True
>>> pool.acquire() # 获取3个工作进程
'Worker4'
>>> pool.acquire()
'Worker2'
>>> pool.acquire()
'Worker3'
>>> pool.release("Worker3") # 释放1个工作进程
True
>>> pool.disassociate("Worker1") # 解除对Worker1的关联
True