""" The MIT License (MIT) Copyright (c) 2015-present Rapptz Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ from __future__ import annotations from typing import ( Any, Coroutine, Dict, Hashable, Union, Callable, TypeVar, Optional, TYPE_CHECKING, ) import time from .commands import check from .errors import ( NoPrivateMessage, MissingRole, MissingAnyRole, MissingPermissions, BotMissingPermissions, CommandOnCooldown, ) from ..user import User from ..permissions import Permissions from ..utils import get as utils_get, MISSING, maybe_coroutine T = TypeVar('T') if TYPE_CHECKING: from typing_extensions import Self from ..interactions import Interaction CooldownFunction = Union[ Callable[[Interaction[Any]], Coroutine[Any, Any, T]], Callable[[Interaction[Any]], T], ] __all__ = ( 'has_role', 'has_any_role', 'has_permissions', 'bot_has_permissions', 'cooldown', 'dynamic_cooldown', ) class Cooldown: """Represents a cooldown for a command. .. versionadded:: 2.0 Attributes ----------- rate: :class:`float` The total number of tokens available per :attr:`per` seconds. per: :class:`float` The length of the cooldown period in seconds. """ __slots__ = ('rate', 'per', '_window', '_tokens', '_last') def __init__(self, rate: float, per: float) -> None: self.rate: int = int(rate) self.per: float = float(per) self._window: float = 0.0 self._tokens: int = self.rate self._last: float = 0.0 def get_tokens(self, current: Optional[float] = None) -> int: """Returns the number of available tokens before rate limiting is applied. Parameters ------------ current: Optional[:class:`float`] The time in seconds since Unix epoch to calculate tokens at. If not supplied then :func:`time.time()` is used. Returns -------- :class:`int` The number of tokens available before the cooldown is to be applied. """ if not current: current = time.time() # the calculated tokens should be non-negative tokens = max(self._tokens, 0) if current > self._window + self.per: tokens = self.rate return tokens def get_retry_after(self, current: Optional[float] = None) -> float: """Returns the time in seconds until the cooldown will be reset. Parameters ------------- current: Optional[:class:`float`] The current time in seconds since Unix epoch. If not supplied, then :func:`time.time()` is used. Returns ------- :class:`float` The number of seconds to wait before this cooldown will be reset. """ current = current or time.time() tokens = self.get_tokens(current) if tokens == 0: return self.per - (current - self._window) return 0.0 def update_rate_limit(self, current: Optional[float] = None, *, tokens: int = 1) -> Optional[float]: """Updates the cooldown rate limit. Parameters ------------- current: Optional[:class:`float`] The time in seconds since Unix epoch to update the rate limit at. If not supplied, then :func:`time.time()` is used. tokens: :class:`int` The amount of tokens to deduct from the rate limit. Returns ------- Optional[:class:`float`] The retry-after time in seconds if rate limited. """ current = current or time.time() self._last = current self._tokens = self.get_tokens(current) # first token used means that we start a new rate limit window if self._tokens == self.rate: self._window = current # decrement tokens by specified number self._tokens -= tokens # check if we are rate limited and return retry-after if self._tokens < 0: return self.per - (current - self._window) def reset(self) -> None: """Reset the cooldown to its initial state.""" self._tokens = self.rate self._last = 0.0 def copy(self) -> Self: """Creates a copy of this cooldown. Returns -------- :class:`Cooldown` A new instance of this cooldown. """ return Cooldown(self.rate, self.per) def __repr__(self) -> str: return f'' def has_role(item: Union[int, str], /) -> Callable[[T], T]: """A :func:`~discord.app_commands.check` that is added that checks if the member invoking the command has the role specified via the name or ID specified. If a string is specified, you must give the exact name of the role, including caps and spelling. If an integer is specified, you must give the exact snowflake ID of the role. This check raises one of two special exceptions, :exc:`~discord.app_commands.MissingRole` if the user is missing a role, or :exc:`~discord.app_commands.NoPrivateMessage` if it is used in a private message. Both inherit from :exc:`~discord.app_commands.CheckFailure`. .. versionadded:: 2.0 .. note:: This is different from the permission system that Discord provides for application commands. This is done entirely locally in the program rather than being handled by Discord. Parameters ----------- item: Union[:class:`int`, :class:`str`] The name or ID of the role to check. """ def predicate(interaction: Interaction) -> bool: if isinstance(interaction.user, User): raise NoPrivateMessage() if isinstance(item, int): role = interaction.user.get_role(item) else: role = utils_get(interaction.user.roles, name=item) if role is None: raise MissingRole(item) return True return check(predicate) def has_any_role(*items: Union[int, str]) -> Callable[[T], T]: r"""A :func:`~discord.app_commands.check` that is added that checks if the member invoking the command has **any** of the roles specified. This means that if they have one out of the three roles specified, then this check will return ``True``. Similar to :func:`has_role`\, the names or IDs passed in must be exact. This check raises one of two special exceptions, :exc:`~discord.app_commands.MissingAnyRole` if the user is missing all roles, or :exc:`~discord.app_commands.NoPrivateMessage` if it is used in a private message. Both inherit from :exc:`~discord.app_commands.CheckFailure`. .. versionadded:: 2.0 .. note:: This is different from the permission system that Discord provides for application commands. This is done entirely locally in the program rather than being handled by Discord. Parameters ----------- items: List[Union[:class:`str`, :class:`int`]] An argument list of names or IDs to check that the member has roles wise. Example -------- .. code-block:: python3 @tree.command() @app_commands.checks.has_any_role('Library Devs', 'Moderators', 492212595072434186) async def cool(interaction: discord.Interaction): await interaction.response.send_message('You are cool indeed') """ def predicate(interaction: Interaction) -> bool: if isinstance(interaction.user, User): raise NoPrivateMessage() if any( interaction.user.get_role(item) is not None if isinstance(item, int) else utils_get(interaction.user.roles, name=item) is not None for item in items ): return True raise MissingAnyRole(list(items)) return check(predicate) def has_permissions(**perms: bool) -> Callable[[T], T]: r"""A :func:`~discord.app_commands.check` that is added that checks if the member has all of the permissions necessary. Note that this check operates on the permissions given by :attr:`discord.Interaction.permissions`. The permissions passed in must be exactly like the properties shown under :class:`discord.Permissions`. This check raises a special exception, :exc:`~discord.app_commands.MissingPermissions` that is inherited from :exc:`~discord.app_commands.CheckFailure`. .. versionadded:: 2.0 .. note:: This is different from the permission system that Discord provides for application commands. This is done entirely locally in the program rather than being handled by Discord. Parameters ------------ \*\*perms: :class:`bool` Keyword arguments denoting the permissions to check for. Example --------- .. code-block:: python3 @tree.command() @app_commands.checks.has_permissions(manage_messages=True) async def test(interaction: discord.Interaction): await interaction.response.send_message('You can manage messages.') """ invalid = perms.keys() - Permissions.VALID_FLAGS.keys() if invalid: raise TypeError(f"Invalid permission(s): {', '.join(invalid)}") def predicate(interaction: Interaction) -> bool: permissions = interaction.permissions missing = [perm for perm, value in perms.items() if getattr(permissions, perm) != value] if not missing: return True raise MissingPermissions(missing) return check(predicate) def bot_has_permissions(**perms: bool) -> Callable[[T], T]: """Similar to :func:`has_permissions` except checks if the bot itself has the permissions listed. This relies on :attr:`discord.Interaction.app_permissions`. This check raises a special exception, :exc:`~discord.app_commands.BotMissingPermissions` that is inherited from :exc:`~discord.app_commands.CheckFailure`. .. versionadded:: 2.0 """ invalid = set(perms) - set(Permissions.VALID_FLAGS) if invalid: raise TypeError(f"Invalid permission(s): {', '.join(invalid)}") def predicate(interaction: Interaction) -> bool: permissions = interaction.app_permissions missing = [perm for perm, value in perms.items() if getattr(permissions, perm) != value] if not missing: return True raise BotMissingPermissions(missing) return check(predicate) def _create_cooldown_decorator( key: CooldownFunction[Hashable], factory: CooldownFunction[Optional[Cooldown]] ) -> Callable[[T], T]: mapping: Dict[Any, Cooldown] = {} async def get_bucket( interaction: Interaction, *, mapping: Dict[Any, Cooldown] = mapping, key: CooldownFunction[Hashable] = key, factory: CooldownFunction[Optional[Cooldown]] = factory, ) -> Optional[Cooldown]: current = interaction.created_at.timestamp() dead_keys = [k for k, v in mapping.items() if current > v._last + v.per] for k in dead_keys: del mapping[k] k = await maybe_coroutine(key, interaction) if k not in mapping: bucket: Optional[Cooldown] = await maybe_coroutine(factory, interaction) if bucket is not None: mapping[k] = bucket else: bucket = mapping[k] return bucket async def predicate(interaction: Interaction) -> bool: bucket = await get_bucket(interaction) if bucket is None: return True retry_after = bucket.update_rate_limit(interaction.created_at.timestamp()) if retry_after is None: return True raise CommandOnCooldown(bucket, retry_after) return check(predicate) def cooldown( rate: float, per: float, *, key: Optional[CooldownFunction[Hashable]] = MISSING, ) -> Callable[[T], T]: """A decorator that adds a cooldown to a command. A cooldown allows a command to only be used a specific amount of times in a specific time frame. These cooldowns are based off of the ``key`` function provided. If a ``key`` is not provided then it defaults to a user-level cooldown. The ``key`` function must take a single parameter, the :class:`discord.Interaction` and return a value that is used as a key to the internal cooldown mapping. The ``key`` function can optionally be a coroutine. If a cooldown is triggered, then :exc:`~discord.app_commands.CommandOnCooldown` is raised to the error handlers. Examples --------- Setting a one per 5 seconds per member cooldown on a command: .. code-block:: python3 @tree.command() @app_commands.checks.cooldown(1, 5.0, key=lambda i: (i.guild_id, i.user.id)) async def test(interaction: discord.Interaction): await interaction.response.send_message('Hello') @test.error async def on_test_error(interaction: discord.Interaction, error: app_commands.AppCommandError): if isinstance(error, app_commands.CommandOnCooldown): await interaction.response.send_message(str(error), ephemeral=True) Parameters ------------ rate: :class:`int` The number of times a command can be used before triggering a cooldown. per: :class:`float` The amount of seconds to wait for a cooldown when it's been triggered. key: Optional[Callable[[:class:`discord.Interaction`], :class:`collections.abc.Hashable`]] A function that returns a key to the mapping denoting the type of cooldown. Can optionally be a coroutine. If not given then defaults to a user-level cooldown. If ``None`` is passed then it is interpreted as a "global" cooldown. """ if key is MISSING: key_func = lambda interaction: interaction.user.id elif key is None: key_func = lambda i: None else: key_func = key factory = lambda interaction: Cooldown(rate, per) return _create_cooldown_decorator(key_func, factory) def dynamic_cooldown( factory: CooldownFunction[Optional[Cooldown]], *, key: Optional[CooldownFunction[Hashable]] = MISSING, ) -> Callable[[T], T]: """A decorator that adds a dynamic cooldown to a command. A cooldown allows a command to only be used a specific amount of times in a specific time frame. These cooldowns are based off of the ``key`` function provided. If a ``key`` is not provided then it defaults to a user-level cooldown. The ``key`` function must take a single parameter, the :class:`discord.Interaction` and return a value that is used as a key to the internal cooldown mapping. If a ``factory`` function is given, it must be a function that accepts a single parameter of type :class:`discord.Interaction` and must return a :class:`~discord.app_commands.Cooldown` or ``None``. If ``None`` is returned then that cooldown is effectively bypassed. Both ``key`` and ``factory`` can optionally be coroutines. If a cooldown is triggered, then :exc:`~discord.app_commands.CommandOnCooldown` is raised to the error handlers. Examples --------- Setting a cooldown for everyone but the owner. .. code-block:: python3 def cooldown_for_everyone_but_me(interaction: discord.Interaction) -> Optional[app_commands.Cooldown]: if interaction.user.id == 80088516616269824: return None return app_commands.Cooldown(1, 10.0) @tree.command() @app_commands.checks.dynamic_cooldown(cooldown_for_everyone_but_me) async def test(interaction: discord.Interaction): await interaction.response.send_message('Hello') @test.error async def on_test_error(interaction: discord.Interaction, error: app_commands.AppCommandError): if isinstance(error, app_commands.CommandOnCooldown): await interaction.response.send_message(str(error), ephemeral=True) Parameters ------------ factory: Optional[Callable[[:class:`discord.Interaction`], Optional[:class:`~discord.app_commands.Cooldown`]]] A function that takes an interaction and returns a cooldown that will apply to that interaction or ``None`` if the interaction should not have a cooldown. key: Optional[Callable[[:class:`discord.Interaction`], :class:`collections.abc.Hashable`]] A function that returns a key to the mapping denoting the type of cooldown. Can optionally be a coroutine. If not given then defaults to a user-level cooldown. If ``None`` is passed then it is interpreted as a "global" cooldown. """ if key is MISSING: key_func = lambda interaction: interaction.user.id elif key is None: key_func = lambda i: None else: key_func = key return _create_cooldown_decorator(key_func, factory)