|
|
@ -25,12 +25,19 @@ DEALINGS IN THE SOFTWARE. |
|
|
|
from __future__ import annotations |
|
|
|
|
|
|
|
from typing import ( |
|
|
|
Any, |
|
|
|
Coroutine, |
|
|
|
Dict, |
|
|
|
Hashable, |
|
|
|
Union, |
|
|
|
Callable, |
|
|
|
TypeVar, |
|
|
|
Optional, |
|
|
|
TYPE_CHECKING, |
|
|
|
) |
|
|
|
|
|
|
|
import time |
|
|
|
|
|
|
|
from .commands import check |
|
|
|
from .errors import ( |
|
|
|
NoPrivateMessage, |
|
|
@ -38,25 +45,150 @@ from .errors import ( |
|
|
|
MissingAnyRole, |
|
|
|
MissingPermissions, |
|
|
|
BotMissingPermissions, |
|
|
|
CommandOnCooldown, |
|
|
|
) |
|
|
|
|
|
|
|
from ..user import User |
|
|
|
from ..permissions import Permissions |
|
|
|
from ..utils import get as utils_get |
|
|
|
from ..utils import get as utils_get, MISSING, maybe_coroutine |
|
|
|
|
|
|
|
T = TypeVar('T') |
|
|
|
|
|
|
|
if TYPE_CHECKING: |
|
|
|
from typing_extensions import Self |
|
|
|
from ..interactions import Interaction |
|
|
|
|
|
|
|
CooldownFunction = Union[ |
|
|
|
Callable[[Interaction], Coroutine[Any, Any, T]], |
|
|
|
Callable[[Interaction], T], |
|
|
|
] |
|
|
|
|
|
|
|
__all__ = ( |
|
|
|
'has_role', |
|
|
|
'has_any_role', |
|
|
|
'has_permissions', |
|
|
|
'bot_has_permissions', |
|
|
|
'cooldown', |
|
|
|
'dynamic_cooldown', |
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
class Cooldown: |
|
|
|
"""Represents a cooldown for a command. |
|
|
|
|
|
|
|
.. versionadded:: 2.0 |
|
|
|
|
|
|
|
Attributes |
|
|
|
----------- |
|
|
|
rate: :class:`float` |
|
|
|
The total number of tokens available per :attr:`per` seconds. |
|
|
|
per: :class:`float` |
|
|
|
The length of the cooldown period in seconds. |
|
|
|
""" |
|
|
|
|
|
|
|
__slots__ = ('rate', 'per', '_window', '_tokens', '_last') |
|
|
|
|
|
|
|
def __init__(self, rate: float, per: float) -> None: |
|
|
|
self.rate: int = int(rate) |
|
|
|
self.per: float = float(per) |
|
|
|
self._window: float = 0.0 |
|
|
|
self._tokens: int = self.rate |
|
|
|
self._last: float = 0.0 |
|
|
|
|
|
|
|
def get_tokens(self, current: Optional[float] = None) -> int: |
|
|
|
"""Returns the number of available tokens before rate limiting is applied. |
|
|
|
|
|
|
|
Parameters |
|
|
|
------------ |
|
|
|
current: Optional[:class:`float`] |
|
|
|
The time in seconds since Unix epoch to calculate tokens at. |
|
|
|
If not supplied then :func:`time.time()` is used. |
|
|
|
|
|
|
|
Returns |
|
|
|
-------- |
|
|
|
:class:`int` |
|
|
|
The number of tokens available before the cooldown is to be applied. |
|
|
|
""" |
|
|
|
if not current: |
|
|
|
current = time.time() |
|
|
|
|
|
|
|
tokens = self._tokens |
|
|
|
|
|
|
|
if current > self._window + self.per: |
|
|
|
tokens = self.rate |
|
|
|
return tokens |
|
|
|
|
|
|
|
def get_retry_after(self, current: Optional[float] = None) -> float: |
|
|
|
"""Returns the time in seconds until the cooldown will be reset. |
|
|
|
|
|
|
|
Parameters |
|
|
|
------------- |
|
|
|
current: Optional[:class:`float`] |
|
|
|
The current time in seconds since Unix epoch. |
|
|
|
If not supplied, then :func:`time.time()` is used. |
|
|
|
|
|
|
|
Returns |
|
|
|
------- |
|
|
|
:class:`float` |
|
|
|
The number of seconds to wait before this cooldown will be reset. |
|
|
|
""" |
|
|
|
current = current or time.time() |
|
|
|
tokens = self.get_tokens(current) |
|
|
|
|
|
|
|
if tokens == 0: |
|
|
|
return self.per - (current - self._window) |
|
|
|
|
|
|
|
return 0.0 |
|
|
|
|
|
|
|
def update_rate_limit(self, current: Optional[float] = None) -> Optional[float]: |
|
|
|
"""Updates the cooldown rate limit. |
|
|
|
|
|
|
|
Parameters |
|
|
|
------------- |
|
|
|
current: Optional[:class:`float`] |
|
|
|
The time in seconds since Unix epoch to update the rate limit at. |
|
|
|
If not supplied, then :func:`time.time()` is used. |
|
|
|
|
|
|
|
Returns |
|
|
|
------- |
|
|
|
Optional[:class:`float`] |
|
|
|
The retry-after time in seconds if rate limited. |
|
|
|
""" |
|
|
|
current = current or time.time() |
|
|
|
self._last = current |
|
|
|
|
|
|
|
self._tokens = self.get_tokens(current) |
|
|
|
|
|
|
|
# first token used means that we start a new rate limit window |
|
|
|
if self._tokens == self.rate: |
|
|
|
self._window = current |
|
|
|
|
|
|
|
# check if we are rate limited |
|
|
|
if self._tokens == 0: |
|
|
|
return self.per - (current - self._window) |
|
|
|
|
|
|
|
# we're not so decrement our tokens |
|
|
|
self._tokens -= 1 |
|
|
|
|
|
|
|
def reset(self) -> None: |
|
|
|
"""Reset the cooldown to its initial state.""" |
|
|
|
self._tokens = self.rate |
|
|
|
self._last = 0.0 |
|
|
|
|
|
|
|
def copy(self) -> Self: |
|
|
|
"""Creates a copy of this cooldown. |
|
|
|
|
|
|
|
Returns |
|
|
|
-------- |
|
|
|
:class:`Cooldown` |
|
|
|
A new instance of this cooldown. |
|
|
|
""" |
|
|
|
return Cooldown(self.rate, self.per) |
|
|
|
|
|
|
|
def __repr__(self) -> str: |
|
|
|
return f'<Cooldown rate: {self.rate} per: {self.per} window: {self._window} tokens: {self._tokens}>' |
|
|
|
|
|
|
|
|
|
|
|
def has_role(item: Union[int, str], /) -> Callable[[T], T]: |
|
|
|
"""A :func:`~discord.app_commands.check` that is added that checks if the member invoking the |
|
|
|
command has the role specified via the name or ID specified. |
|
|
@ -236,3 +368,134 @@ def bot_has_permissions(**perms: bool) -> Callable[[T], T]: |
|
|
|
raise BotMissingPermissions(missing) |
|
|
|
|
|
|
|
return check(predicate) |
|
|
|
|
|
|
|
|
|
|
|
def _create_cooldown_decorator( |
|
|
|
key: CooldownFunction[Hashable], factory: CooldownFunction[Optional[Cooldown]] |
|
|
|
) -> Callable[[T], T]: |
|
|
|
|
|
|
|
mapping: Dict[Any, Cooldown] = {} |
|
|
|
|
|
|
|
async def get_bucket( |
|
|
|
interaction: Interaction, |
|
|
|
*, |
|
|
|
mapping: Dict[Any, Cooldown] = mapping, |
|
|
|
key: CooldownFunction[Hashable] = key, |
|
|
|
factory: CooldownFunction[Optional[Cooldown]] = factory, |
|
|
|
) -> Optional[Cooldown]: |
|
|
|
current = interaction.created_at.timestamp() |
|
|
|
dead_keys = [k for k, v in mapping.items() if current > v._last + v.per] |
|
|
|
for k in dead_keys: |
|
|
|
del mapping[k] |
|
|
|
|
|
|
|
k = await maybe_coroutine(key, interaction) |
|
|
|
if k not in mapping: |
|
|
|
bucket: Optional[Cooldown] = await maybe_coroutine(factory, interaction) |
|
|
|
if bucket is not None: |
|
|
|
mapping[k] = bucket |
|
|
|
else: |
|
|
|
bucket = mapping[k] |
|
|
|
|
|
|
|
return bucket |
|
|
|
|
|
|
|
async def predicate(interaction: Interaction) -> bool: |
|
|
|
bucket = await get_bucket(interaction) |
|
|
|
if bucket is None: |
|
|
|
return True |
|
|
|
|
|
|
|
retry_after = bucket.update_rate_limit(interaction.created_at.timestamp()) |
|
|
|
if retry_after is None: |
|
|
|
return True |
|
|
|
|
|
|
|
raise CommandOnCooldown(bucket, retry_after) |
|
|
|
|
|
|
|
return check(predicate) |
|
|
|
|
|
|
|
|
|
|
|
def cooldown( |
|
|
|
rate: float, |
|
|
|
per: float, |
|
|
|
*, |
|
|
|
key: Optional[CooldownFunction[Hashable]] = MISSING, |
|
|
|
) -> Callable[[T], T]: |
|
|
|
"""A decorator that adds a cooldown to a command. |
|
|
|
|
|
|
|
A cooldown allows a command to only be used a specific amount |
|
|
|
of times in a specific time frame. These cooldowns are based off |
|
|
|
of the ``key`` function provided. If a ``key`` is not provided |
|
|
|
then it defaults to a user-level cooldown. The ``key`` function |
|
|
|
must take a single parameter, the :class:`discord.Interaction` and |
|
|
|
return a value that is used as a key to the internal cooldown mapping. |
|
|
|
|
|
|
|
The ``key`` function can optionally be a coroutine. |
|
|
|
|
|
|
|
If a cooldown is triggered, then :exc:`~discord.app_commands.CommandOnCooldown` is |
|
|
|
raised to the error handlers. |
|
|
|
|
|
|
|
Parameters |
|
|
|
------------ |
|
|
|
rate: :class:`int` |
|
|
|
The number of times a command can be used before triggering a cooldown. |
|
|
|
per: :class:`float` |
|
|
|
The amount of seconds to wait for a cooldown when it's been triggered. |
|
|
|
key: Optional[Callable[[:class:`discord.Interaction`], :class:`collections.abc.Hashable`]] |
|
|
|
A function that returns a key to the mapping denoting the type of cooldown. |
|
|
|
Can optionally be a coroutine. If not given then defaults to a user-level |
|
|
|
cooldown. If ``None`` is passed then it is interpreted as a "global" cooldown. |
|
|
|
""" |
|
|
|
|
|
|
|
if key is MISSING: |
|
|
|
key_func = lambda interaction: interaction.user.id |
|
|
|
elif key is None: |
|
|
|
key_func = lambda i: None |
|
|
|
else: |
|
|
|
key_func = key |
|
|
|
|
|
|
|
factory = lambda interaction: Cooldown(rate, per) |
|
|
|
|
|
|
|
return _create_cooldown_decorator(key_func, factory) |
|
|
|
|
|
|
|
|
|
|
|
def dynamic_cooldown( |
|
|
|
factory: CooldownFunction[Optional[Cooldown]], |
|
|
|
*, |
|
|
|
key: Optional[CooldownFunction[Hashable]] = MISSING, |
|
|
|
) -> Callable[[T], T]: |
|
|
|
"""A decorator that adds a dynamic cooldown to a command. |
|
|
|
|
|
|
|
A cooldown allows a command to only be used a specific amount |
|
|
|
of times in a specific time frame. These cooldowns are based off |
|
|
|
of the ``key`` function provided. If a ``key`` is not provided |
|
|
|
then it defaults to a user-level cooldown. The ``key`` function |
|
|
|
must take a single parameter, the :class:`discord.Interaction` and |
|
|
|
return a value that is used as a key to the internal cooldown mapping. |
|
|
|
|
|
|
|
If a ``factory`` function is given, it must be a function that |
|
|
|
accepts a single parameter of type :class:`discord.Interaction` and must |
|
|
|
return a :class:`~discord.app_commands.Cooldown` or ``None``. |
|
|
|
If ``None`` is returned then that cooldown is effectively bypassed. |
|
|
|
|
|
|
|
Both ``key`` and ``factory`` can optionally be coroutines. |
|
|
|
|
|
|
|
If a cooldown is triggered, then :exc:`~discord.app_commands.CommandOnCooldown` is |
|
|
|
raised to the error handlers. |
|
|
|
|
|
|
|
Parameters |
|
|
|
------------ |
|
|
|
factory: Optional[Callable[[:class:`discord.Interaction`], Optional[:class:`~discord.app_commands.Cooldown`]]] |
|
|
|
A function that takes an interaction and returns a cooldown that will apply to that interaction |
|
|
|
or ``None`` if the interaction should not have a cooldown. |
|
|
|
key: Optional[Callable[[:class:`discord.Interaction`], :class:`collections.abc.Hashable`]] |
|
|
|
A function that returns a key to the mapping denoting the type of cooldown. |
|
|
|
Can optionally be a coroutine. If not given then defaults to a user-level |
|
|
|
cooldown. If ``None`` is passed then it is interpreted as a "global" cooldown. |
|
|
|
""" |
|
|
|
|
|
|
|
if key is MISSING: |
|
|
|
key_func = lambda interaction: interaction.user.id |
|
|
|
elif key is None: |
|
|
|
key_func = lambda i: None |
|
|
|
else: |
|
|
|
key_func = key |
|
|
|
|
|
|
|
return _create_cooldown_decorator(key_func, factory) |
|
|
|