《Redis应用实例》书摘(13):速率限制器¶
速率限制是计算机系统常用的一种限制手段,它可以用于控制系统处理请求或是执行操作的频率,从而达到保护系统自身、数据及用户安全等多个方面的目的。
代码清单 CODE_RATE_LIMITER 展示了基于字符串实现的速率限制程序。
代码清单 CODE_RATE_LIMITER 速率限制器 rate_limiter.py
def make_limiter_key(uid, action):
"""
构建用于记录用户执行指定行为次数的计数器键。
例子: RateLimiter:Peter:login
"""
return "RateLimiter:{0}:{1}".format(uid, action)
class RateLimiter:
def __init__(self, client, action, interval, maximum):
"""
根据给定的行为、间隔和最大次数参数,创建相应行为的速率限制器实例。
"""
self.client = client
self.action = action
self.interval = interval
self.maximum = maximum
def is_permitted(self, uid):
"""
判断给定用户当前是否可以执行指定行为。
"""
key = make_limiter_key(uid, self.action)
# 更新计数器并在有需要时为其设置过期时间
tx = self.client.pipeline()
tx.incr(key)
tx.expire(key, self.interval, nx=True)
current_times, _ = tx.execute()
# 根据计数器的当前值判断本次行为是否可以执行
return current_times <= self.maximum
def remaining(self, uid):
"""
返回给定用户当前还可以执行指定行为的次数。
"""
# 根据键获取计数器中储存的值
key = make_limiter_key(uid, self.action)
current_times = self.client.get(key)
# 值为空则表示给定用户当前并未执行过指定行为
if current_times is None:
return self.maximum
# 将值转换为数字,然后通过计算获取剩余的可执行次数
current_times = int(current_times)
if current_times > self.maximum:
return 0
else:
return self.maximum - current_times
def duration(self, uid):
"""
计算距离给定用户允许再次执行指定行为需要多长时间,单位为秒。
返回None则表示给定用户当前无需等待,仍然可以执行指定行为。
"""
# 同时取出计数器的当前值和它的剩余生存时间
key = make_limiter_key(uid, self.action)
tx = self.client.pipeline()
tx.get(key)
tx.ttl(key)
current_times, remaining_ttl = tx.execute()
# 仅在计数器非空并且次数已超限的情况下计算需等待时长
if current_times is not None:
if int(current_times) >= self.maximum:
return remaining_ttl
def revoke(self, uid):
"""
撤销对用户执行指定行为的限制。
"""
key = make_limiter_key(uid, self.action)
self.client.delete(key)
为了展示这个速率限制器的使用方法,以下代码创建了一个限制用户在24小时内最多只能尝试登录5次的速率限制器:
>>> from redis import Redis
>>> from rate_limiter import RateLimiter
>>> client = Redis(decode_responses=True)
>>> limiter = RateLimiter(client, "login", 86400, 5)
之后,程序可以使用以下代码来模拟Peter执行了三次登录操作,而这些操作都会被允许:
>>> for i in range(3):
... limiter.is_permitted("Peter")
...
True
True
True
而remaining()方法则显示Peter至少还能够再尝试登录两次:
>>> limiter.remaining("Peter")
2
再次尝试登录三次之后,前两次将会成功,而最后一次也即是第6次则会失败:
>>> for i in range(3):
... limiter.is_permitted("Peter")
...
True
True
False