You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

537 lines
18 KiB

"""
The MIT License (MIT)
Copyright (c) 2015-present Rapptz
Permission is hereby granted, free of charge, to any person obtaining a
copy of this software and associated documentation files (the "Software"),
to deal in the Software without restriction, including without limitation
the rights to use, copy, modify, merge, publish, distribute, sublicense,
and/or sell copies of the Software, and to permit persons to whom the
Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.
"""
from __future__ import annotations
from typing import (
Any,
Coroutine,
Dict,
Hashable,
Union,
Callable,
TypeVar,
Optional,
TYPE_CHECKING,
)
import time
from .commands import check
from .errors import (
NoPrivateMessage,
MissingRole,
MissingAnyRole,
MissingPermissions,
BotMissingPermissions,
CommandOnCooldown,
)
from ..user import User
from ..permissions import Permissions
from ..utils import get as utils_get, MISSING, maybe_coroutine
T = TypeVar('T')
if TYPE_CHECKING:
from typing_extensions import Self
from ..interactions import Interaction
CooldownFunction = Union[
Callable[[Interaction[Any]], Coroutine[Any, Any, T]],
Callable[[Interaction[Any]], T],
]
__all__ = (
'has_role',
'has_any_role',
'has_permissions',
'bot_has_permissions',
'cooldown',
'dynamic_cooldown',
)
class Cooldown:
"""Represents a cooldown for a command.
.. versionadded:: 2.0
Attributes
-----------
rate: :class:`float`
The total number of tokens available per :attr:`per` seconds.
per: :class:`float`
The length of the cooldown period in seconds.
"""
__slots__ = ('rate', 'per', '_window', '_tokens', '_last')
def __init__(self, rate: float, per: float) -> None:
self.rate: int = int(rate)
self.per: float = float(per)
self._window: float = 0.0
self._tokens: int = self.rate
self._last: float = 0.0
def get_tokens(self, current: Optional[float] = None) -> int:
"""Returns the number of available tokens before rate limiting is applied.
Parameters
------------
current: Optional[:class:`float`]
The time in seconds since Unix epoch to calculate tokens at.
If not supplied then :func:`time.time()` is used.
Returns
--------
:class:`int`
The number of tokens available before the cooldown is to be applied.
"""
if not current:
current = time.time()
# the calculated tokens should be non-negative
tokens = max(self._tokens, 0)
if current > self._window + self.per:
tokens = self.rate
return tokens
def get_retry_after(self, current: Optional[float] = None) -> float:
"""Returns the time in seconds until the cooldown will be reset.
Parameters
-------------
current: Optional[:class:`float`]
The current time in seconds since Unix epoch.
If not supplied, then :func:`time.time()` is used.
Returns
-------
:class:`float`
The number of seconds to wait before this cooldown will be reset.
"""
current = current or time.time()
tokens = self.get_tokens(current)
if tokens == 0:
return self.per - (current - self._window)
return 0.0
def update_rate_limit(self, current: Optional[float] = None, *, tokens: int = 1) -> Optional[float]:
"""Updates the cooldown rate limit.
Parameters
-------------
current: Optional[:class:`float`]
The time in seconds since Unix epoch to update the rate limit at.
If not supplied, then :func:`time.time()` is used.
tokens: :class:`int`
The amount of tokens to deduct from the rate limit.
Returns
-------
Optional[:class:`float`]
The retry-after time in seconds if rate limited.
"""
current = current or time.time()
self._last = current
self._tokens = self.get_tokens(current)
# first token used means that we start a new rate limit window
if self._tokens == self.rate:
self._window = current
# decrement tokens by specified number
self._tokens -= tokens
# check if we are rate limited and return retry-after
if self._tokens < 0:
return self.per - (current - self._window)
def reset(self) -> None:
"""Reset the cooldown to its initial state."""
self._tokens = self.rate
self._last = 0.0
def copy(self) -> Self:
"""Creates a copy of this cooldown.
Returns
--------
:class:`Cooldown`
A new instance of this cooldown.
"""
return self.__class__(self.rate, self.per)
def __repr__(self) -> str:
return f'<Cooldown rate: {self.rate} per: {self.per} window: {self._window} tokens: {self._tokens}>'
def has_role(item: Union[int, str], /) -> Callable[[T], T]:
"""A :func:`~discord.app_commands.check` that is added that checks if the member invoking the
command has the role specified via the name or ID specified.
If a string is specified, you must give the exact name of the role, including
caps and spelling.
If an integer is specified, you must give the exact snowflake ID of the role.
This check raises one of two special exceptions, :exc:`~discord.app_commands.MissingRole`
if the user is missing a role, or :exc:`~discord.app_commands.NoPrivateMessage` if
it is used in a private message. Both inherit from :exc:`~discord.app_commands.CheckFailure`.
.. versionadded:: 2.0
.. note::
This is different from the permission system that Discord provides for application
commands. This is done entirely locally in the program rather than being handled
by Discord.
Parameters
-----------
item: Union[:class:`int`, :class:`str`]
The name or ID of the role to check.
"""
def predicate(interaction: Interaction) -> bool:
if isinstance(interaction.user, User):
raise NoPrivateMessage()
if isinstance(item, int):
role = interaction.user.get_role(item)
else:
role = utils_get(interaction.user.roles, name=item)
if role is None:
raise MissingRole(item)
return True
return check(predicate)
def has_any_role(*items: Union[int, str]) -> Callable[[T], T]:
r"""A :func:`~discord.app_commands.check` that is added that checks if the member
invoking the command has **any** of the roles specified. This means that if they have
one out of the three roles specified, then this check will return ``True``.
Similar to :func:`has_role`\, the names or IDs passed in must be exact.
This check raises one of two special exceptions, :exc:`~discord.app_commands.MissingAnyRole`
if the user is missing all roles, or :exc:`~discord.app_commands.NoPrivateMessage` if
it is used in a private message. Both inherit from :exc:`~discord.app_commands.CheckFailure`.
.. versionadded:: 2.0
.. note::
This is different from the permission system that Discord provides for application
commands. This is done entirely locally in the program rather than being handled
by Discord.
Parameters
-----------
items: List[Union[:class:`str`, :class:`int`]]
An argument list of names or IDs to check that the member has roles wise.
Example
--------
.. code-block:: python3
@tree.command()
@app_commands.checks.has_any_role('Library Devs', 'Moderators', 492212595072434186)
async def cool(interaction: discord.Interaction):
await interaction.response.send_message('You are cool indeed')
"""
def predicate(interaction: Interaction) -> bool:
if isinstance(interaction.user, User):
raise NoPrivateMessage()
if any(
interaction.user.get_role(item) is not None
if isinstance(item, int)
else utils_get(interaction.user.roles, name=item) is not None
for item in items
):
return True
raise MissingAnyRole(list(items))
return check(predicate)
def has_permissions(**perms: bool) -> Callable[[T], T]:
r"""A :func:`~discord.app_commands.check` that is added that checks if the member
has all of the permissions necessary.
Note that this check operates on the permissions given by
:attr:`discord.Interaction.permissions`.
The permissions passed in must be exactly like the properties shown under
:class:`discord.Permissions`.
This check raises a special exception, :exc:`~discord.app_commands.MissingPermissions`
that is inherited from :exc:`~discord.app_commands.CheckFailure`.
.. versionadded:: 2.0
.. note::
This is different from the permission system that Discord provides for application
commands. This is done entirely locally in the program rather than being handled
by Discord.
Parameters
------------
\*\*perms: :class:`bool`
Keyword arguments denoting the permissions to check for.
Example
---------
.. code-block:: python3
@tree.command()
@app_commands.checks.has_permissions(manage_messages=True)
async def test(interaction: discord.Interaction):
await interaction.response.send_message('You can manage messages.')
"""
invalid = perms.keys() - Permissions.VALID_FLAGS.keys()
if invalid:
raise TypeError(f"Invalid permission(s): {', '.join(invalid)}")
def predicate(interaction: Interaction) -> bool:
permissions = interaction.permissions
missing = [perm for perm, value in perms.items() if getattr(permissions, perm) != value]
if not missing:
return True
raise MissingPermissions(missing)
return check(predicate)
def bot_has_permissions(**perms: bool) -> Callable[[T], T]:
"""Similar to :func:`has_permissions` except checks if the bot itself has
the permissions listed. This relies on :attr:`discord.Interaction.app_permissions`.
This check raises a special exception, :exc:`~discord.app_commands.BotMissingPermissions`
that is inherited from :exc:`~discord.app_commands.CheckFailure`.
.. versionadded:: 2.0
"""
invalid = set(perms) - set(Permissions.VALID_FLAGS)
if invalid:
raise TypeError(f"Invalid permission(s): {', '.join(invalid)}")
def predicate(interaction: Interaction) -> bool:
permissions = interaction.app_permissions
missing = [perm for perm, value in perms.items() if getattr(permissions, perm) != value]
if not missing:
return True
raise BotMissingPermissions(missing)
return check(predicate)
def _create_cooldown_decorator(
key: CooldownFunction[Hashable], factory: CooldownFunction[Optional[Cooldown]]
) -> Callable[[T], T]:
mapping: Dict[Any, Cooldown] = {}
async def get_bucket(
interaction: Interaction,
*,
mapping: Dict[Any, Cooldown] = mapping,
key: CooldownFunction[Hashable] = key,
factory: CooldownFunction[Optional[Cooldown]] = factory,
) -> Optional[Cooldown]:
current = interaction.created_at.timestamp()
dead_keys = [k for k, v in mapping.items() if current > v._last + v.per]
for k in dead_keys:
del mapping[k]
k = await maybe_coroutine(key, interaction)
if k not in mapping:
bucket: Optional[Cooldown] = await maybe_coroutine(factory, interaction)
if bucket is not None:
mapping[k] = bucket
else:
bucket = mapping[k]
return bucket
async def predicate(interaction: Interaction) -> bool:
bucket = await get_bucket(interaction)
if bucket is None:
return True
retry_after = bucket.update_rate_limit(interaction.created_at.timestamp())
if retry_after is None:
return True
raise CommandOnCooldown(bucket, retry_after)
return check(predicate)
def cooldown(
rate: float,
per: float,
*,
key: Optional[CooldownFunction[Hashable]] = MISSING,
) -> Callable[[T], T]:
"""A decorator that adds a cooldown to a command.
A cooldown allows a command to only be used a specific amount
of times in a specific time frame. These cooldowns are based off
of the ``key`` function provided. If a ``key`` is not provided
then it defaults to a user-level cooldown. The ``key`` function
must take a single parameter, the :class:`discord.Interaction` and
return a value that is used as a key to the internal cooldown mapping.
The ``key`` function can optionally be a coroutine.
If a cooldown is triggered, then :exc:`~discord.app_commands.CommandOnCooldown` is
raised to the error handlers.
Examples
---------
Setting a one per 5 seconds per member cooldown on a command:
.. code-block:: python3
@tree.command()
@app_commands.checks.cooldown(1, 5.0, key=lambda i: (i.guild_id, i.user.id))
async def test(interaction: discord.Interaction):
await interaction.response.send_message('Hello')
@test.error
async def on_test_error(interaction: discord.Interaction, error: app_commands.AppCommandError):
if isinstance(error, app_commands.CommandOnCooldown):
await interaction.response.send_message(str(error), ephemeral=True)
Parameters
------------
rate: :class:`int`
The number of times a command can be used before triggering a cooldown.
per: :class:`float`
The amount of seconds to wait for a cooldown when it's been triggered.
key: Optional[Callable[[:class:`discord.Interaction`], :class:`collections.abc.Hashable`]]
A function that returns a key to the mapping denoting the type of cooldown.
Can optionally be a coroutine. If not given then defaults to a user-level
cooldown. If ``None`` is passed then it is interpreted as a "global" cooldown.
"""
if key is MISSING:
key_func = lambda interaction: interaction.user.id
elif key is None:
key_func = lambda i: None
else:
key_func = key
factory = lambda interaction: Cooldown(rate, per)
return _create_cooldown_decorator(key_func, factory)
def dynamic_cooldown(
factory: CooldownFunction[Optional[Cooldown]],
*,
key: Optional[CooldownFunction[Hashable]] = MISSING,
) -> Callable[[T], T]:
"""A decorator that adds a dynamic cooldown to a command.
A cooldown allows a command to only be used a specific amount
of times in a specific time frame. These cooldowns are based off
of the ``key`` function provided. If a ``key`` is not provided
then it defaults to a user-level cooldown. The ``key`` function
must take a single parameter, the :class:`discord.Interaction` and
return a value that is used as a key to the internal cooldown mapping.
If a ``factory`` function is given, it must be a function that
accepts a single parameter of type :class:`discord.Interaction` and must
return a :class:`~discord.app_commands.Cooldown` or ``None``.
If ``None`` is returned then that cooldown is effectively bypassed.
Both ``key`` and ``factory`` can optionally be coroutines.
If a cooldown is triggered, then :exc:`~discord.app_commands.CommandOnCooldown` is
raised to the error handlers.
Examples
---------
Setting a cooldown for everyone but the owner.
.. code-block:: python3
def cooldown_for_everyone_but_me(interaction: discord.Interaction) -> Optional[app_commands.Cooldown]:
if interaction.user.id == 80088516616269824:
return None
return app_commands.Cooldown(1, 10.0)
@tree.command()
@app_commands.checks.dynamic_cooldown(cooldown_for_everyone_but_me)
async def test(interaction: discord.Interaction):
await interaction.response.send_message('Hello')
@test.error
async def on_test_error(interaction: discord.Interaction, error: app_commands.AppCommandError):
if isinstance(error, app_commands.CommandOnCooldown):
await interaction.response.send_message(str(error), ephemeral=True)
Parameters
------------
factory: Optional[Callable[[:class:`discord.Interaction`], Optional[:class:`~discord.app_commands.Cooldown`]]]
A function that takes an interaction and returns a cooldown that will apply to that interaction
or ``None`` if the interaction should not have a cooldown.
key: Optional[Callable[[:class:`discord.Interaction`], :class:`collections.abc.Hashable`]]
A function that returns a key to the mapping denoting the type of cooldown.
Can optionally be a coroutine. If not given then defaults to a user-level
cooldown. If ``None`` is passed then it is interpreted as a "global" cooldown.
"""
if key is MISSING:
key_func = lambda interaction: interaction.user.id
elif key is None:
key_func = lambda i: None
else:
key_func = key
return _create_cooldown_decorator(key_func, factory)