《Redis应用实例》书摘(13):速率限制器

速率限制是计算机系统常用的一种限制手段,它可以用于控制系统处理请求或是执行操作的频率,从而达到保护系统自身、数据及用户安全等多个方面的目的。

代码清单 CODE_RATE_LIMITER 展示了基于字符串实现的速率限制程序。


代码清单 CODE_RATE_LIMITER 速率限制器 rate_limiter.py

def make_limiter_key(uid, action):
    """
    构建用于记录用户执行指定行为次数的计数器键。
    例子: RateLimiter:Peter:login
    """
    return "RateLimiter:{0}:{1}".format(uid, action)

class RateLimiter:

    def __init__(self, client, action, interval, maximum):
        """
        根据给定的行为、间隔和最大次数参数,创建相应行为的速率限制器实例。
        """
        self.client = client
        self.action = action
        self.interval = interval
        self.maximum = maximum

    def is_permitted(self, uid):
        """
        判断给定用户当前是否可以执行指定行为。
        """
        key = make_limiter_key(uid, self.action)
        # 更新计数器并在有需要时为其设置过期时间
        tx = self.client.pipeline()
        tx.incr(key)
        tx.expire(key, self.interval, nx=True)
        current_times, _ = tx.execute()
        # 根据计数器的当前值判断本次行为是否可以执行
        return current_times <= self.maximum

    def remaining(self, uid):
        """
        返回给定用户当前还可以执行指定行为的次数。
        """
        # 根据键获取计数器中储存的值
        key = make_limiter_key(uid, self.action)
        current_times = self.client.get(key)
        # 值为空则表示给定用户当前并未执行过指定行为
        if current_times is None:
            return self.maximum
        # 将值转换为数字,然后通过计算获取剩余的可执行次数
        current_times = int(current_times)
        if current_times > self.maximum:
            return 0
        else:
            return self.maximum - current_times

    def duration(self, uid):
        """
        计算距离给定用户允许再次执行指定行为需要多长时间,单位为秒。
        返回None则表示给定用户当前无需等待,仍然可以执行指定行为。
        """
        # 同时取出计数器的当前值和它的剩余生存时间
        key = make_limiter_key(uid, self.action)
        tx = self.client.pipeline()
        tx.get(key)
        tx.ttl(key)
        current_times, remaining_ttl = tx.execute()
        # 仅在计数器非空并且次数已超限的情况下计算需等待时长
        if current_times is not None:
            if int(current_times) >= self.maximum:
                return remaining_ttl

    def revoke(self, uid):
        """
        撤销对用户执行指定行为的限制。
        """
        key = make_limiter_key(uid, self.action)
        self.client.delete(key)

为了展示这个速率限制器的使用方法,以下代码创建了一个限制用户在24小时内最多只能尝试登录5次的速率限制器:

>>> from redis import Redis
>>> from rate_limiter import RateLimiter
>>> client = Redis(decode_responses=True)
>>> limiter = RateLimiter(client, "login", 86400, 5)

之后,程序可以使用以下代码来模拟Peter执行了三次登录操作,而这些操作都会被允许:

>>> for i in range(3):
...   limiter.is_permitted("Peter")
...
True
True
True

remaining()方法则显示Peter至少还能够再尝试登录两次:

>>> limiter.remaining("Peter")
2

再次尝试登录三次之后,前两次将会成功,而最后一次也即是第6次则会失败:

>>> for i in range(3):
...   limiter.is_permitted("Peter")
...
True
True
False

Tip

本文摘录自《Redis应用实例》一书。
欢迎访问书本主页以了解更多Redis使用案例:huangz.works/rediscookbook/
../_images/rediscookbook-banner.png