Source code for cloudly.util.ratelimiter

__all__ = ['RateLimiter', 'AsyncRateLimiter']

import asyncio
from threading import Lock
from time import perf_counter, sleep


class Full(Exception):
    pass


class Empty(Exception):
    pass


class Ring:
    def __init__(self, maxlen: int):
        self._maxlen = maxlen
        self._n = 0
        self._head = 0  # index of the first/oldest element; -1 means no element; range [-1, maxlen)
        self._tail = 0  # one after the last/newest element; also the index of the next element to be pushed; range [0, maxlen)
        self._data = [None for _ in range(maxlen)]

    def __len__(self):
        return self._n

    def full(self):
        return self.__len__() == self._maxlen

    def empty(self):
        return self.__len__() == 0

    def push(self, x):
        # Push an element at the "tail" (i.e. latest) end.
        if self.full():
            raise Full
        t = self._tail
        self._data[t] = x
        self._n += 1
        t += 1
        if t == self._maxlen:
            t = 0
        self._tail = t

    def pop(self):
        # Remove and return the "head" (i.e. oldest) element.
        if self.empty():
            raise Empty
        h = self._head
        x = self._data[h]
        self._n -= 1
        h += 1
        if h == self._maxlen:
            h = 0
        self._head = h
        return x

    def head(self):
        # Return (w/o removing) the oldest element.
        if self.empty():
            raise Empty
        return self._data[self._head]

    def tail(self):
        # Return (w/o removing) the latest element.
        if self.empty():
            raise Empty
        t = self._tail - 1
        if t < 0:
            t = self._maxlen - 1
        return self._data[t]


[docs] class RateLimiter: """ This class is used to impose rate limits in one thread or across threads. If you need rate limiting across *processes*, you can use this facility in a multiprocessing "manager". """
[docs] def __init__(self, limit: int, time_window_in_seconds: float | int = 1): """ Suppose ``limit = 10`` and ``time_window_in_seconds=60``, that specifies a rate limit on a minute basis. Specifically, it means "at most 10" in **any** time window of duration "60 seconds". In other words, the minutes are not the "whole minutes" on a wall clock. The rate limit applies in **any** 60-second time window carved out starting **anywhere** on the continuous time axis. The 10 events do not need to happen **evenly** in the 60-second time window; they can very well happen in a burst or multiple bursts. The methods :meth:`wait` and :meth:`nowait` are thread safe. This object can be passed into multiple threads and used in each thread concurrently. """ self.limit = limit self._time_window = time_window_in_seconds self._tokens = Ring(limit) self._lock = Lock()
def _push_one(self): window_start = perf_counter() - self._time_window tokens = self._tokens for _ in range(len(tokens)): if tokens.head() < window_start: tokens.pop() else: break if not tokens.full(): tokens.push(perf_counter()) return True return False
[docs] def wait(self) -> None: """ Once the user is ready to do "the thing" (such as calling an HTTP service) that's subject to this rate limit, call this method. This method may block. Once this method returns, user can go ahead to do the thing. The time of this method's return is recorded internally as one occurrence of that thing. """ with self._lock: if self._push_one(): return tokens = self._tokens sleep(max(0, tokens.head() + self._time_window - perf_counter())) tokens.pop() tokens.push(perf_counter())
[docs] def nowait(self) -> bool: """ In contrast to :meth:`wait`, this method does not wait. If user can go ahead "do it" right now because there's still quota within the rate limit, `True` is returned, and user should go ahead "do it". The current time is internally recorded as an action (to be taken by the user as expected). If the system is currently out of quota (i.e. has reached the limit), `False` is returned, and user should not proceed. Example, :: if my_ratelimiter.nowait(): do_it() else: raise Exception("You have reached rate limit. Please try again later!") .. note:: If there are multiple users of the same `RateLimiter` object, they must all use :meth:`wait` or all use :meth:`nowait`; they can not mix-use the two methods. Otherwise, consider this scenario: say user "A" calls `nowait` while user "B" has called `wait` and is currently holding the lock and sleeping; then "A" has to wait on the lock, which defeats the meaning of "nowait". """ with self._lock: return self._push_one()
[docs] class AsyncRateLimiter: """ An ``AsyncRateLimiter`` is used by one or more async workers in the same thread. See :class:`RateLimiter` for doc. """
[docs] def __init__(self, limit: int, time_window_in_seconds: float | int = 1): self.limit = limit self._time_window = time_window_in_seconds self._tokens = Ring(limit) self._lock = asyncio.Lock()
def _push_one(self): return RateLimiter._push_one(self)
[docs] async def wait(self) -> None: async with self._lock: if self._push_one(): return tokens = self._tokens await asyncio.sleep(tokens.head() + self._time_window - perf_counter()) tokens.pop() tokens.push(perf_counter())
[docs] async def nowait(self) -> bool: async with self._lock: return self._push_one()