You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

2798 lines
101 KiB

"""
The MIT License (MIT)
Copyright (c) 2015-present Rapptz
Permission is hereby granted, free of charge, to any person obtaining a
copy of this software and associated documentation files (the "Software"),
to deal in the Software without restriction, including without limitation
the rights to use, copy, modify, merge, publish, distribute, sublicense,
and/or sell copies of the Software, and to permit persons to whom the
Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.
"""
from __future__ import annotations
import asyncio
import logging
import time
import sys
from typing import (
Any,
ClassVar,
Coroutine,
Dict,
Iterable,
List,
Literal,
NamedTuple,
Optional,
overload,
Sequence,
Tuple,
TYPE_CHECKING,
Type,
TypeVar,
Union,
)
from urllib.parse import quote as _uriquote
import datetime
import aiohttp
from .errors import HTTPException, RateLimited, Forbidden, NotFound, LoginFailure, DiscordServerError, GatewayNotFound
from .gateway import DiscordClientWebSocketResponse
from .file import File
from .mentions import AllowedMentions
from . import __version__, utils
from .utils import MISSING
from .flags import MessageFlags
_log = logging.getLogger(__name__)
if TYPE_CHECKING:
from typing_extensions import Self
from .ui.view import BaseView
from .embeds import Embed
from .message import Attachment
from .poll import Poll
from .types import (
appinfo,
audit_log,
automod,
channel,
command,
emoji,
guild,
integration,
invite,
member,
message,
onboarding,
template,
role,
user,
webhook,
widget,
threads,
scheduled_event,
sticker,
welcome_screen,
sku,
poll,
voice,
soundboard,
subscription,
)
from .types.snowflake import Snowflake, SnowflakeList
from .types.gateway import SessionStartLimit
from types import TracebackType
T = TypeVar('T')
BE = TypeVar('BE', bound=BaseException)
Response = Coroutine[Any, Any, T]
async def json_or_text(response: aiohttp.ClientResponse) -> Union[Dict[str, Any], str]:
text = await response.text(encoding='utf-8')
try:
if response.headers['content-type'] == 'application/json':
return utils._from_json(text)
except KeyError:
# Thanks Cloudflare
pass
return text
class MultipartParameters(NamedTuple):
payload: Optional[Dict[str, Any]]
multipart: Optional[List[Dict[str, Any]]]
files: Optional[Sequence[File]]
def __enter__(self) -> Self:
return self
def __exit__(
self,
exc_type: Optional[Type[BE]],
exc: Optional[BE],
traceback: Optional[TracebackType],
) -> None:
if self.files:
for file in self.files:
file.close()
def handle_message_parameters(
content: Optional[str] = MISSING,
*,
username: str = MISSING,
avatar_url: Any = MISSING,
tts: bool = False,
nonce: Optional[Union[int, str]] = None,
flags: MessageFlags = MISSING,
file: File = MISSING,
files: Sequence[File] = MISSING,
embed: Optional[Embed] = MISSING,
embeds: Sequence[Embed] = MISSING,
attachments: Sequence[Union[Attachment, File]] = MISSING,
view: Optional[BaseView] = MISSING,
allowed_mentions: Optional[AllowedMentions] = MISSING,
message_reference: Optional[message.MessageReference] = MISSING,
stickers: Optional[SnowflakeList] = MISSING,
previous_allowed_mentions: Optional[AllowedMentions] = None,
mention_author: Optional[bool] = None,
thread_name: str = MISSING,
channel_payload: Dict[str, Any] = MISSING,
applied_tags: Optional[SnowflakeList] = MISSING,
poll: Optional[Poll] = MISSING,
) -> MultipartParameters:
if files is not MISSING and file is not MISSING:
raise TypeError('Cannot mix file and files keyword arguments.')
if embeds is not MISSING and embed is not MISSING:
raise TypeError('Cannot mix embed and embeds keyword arguments.')
if file is not MISSING:
files = [file]
if attachments is not MISSING and files is not MISSING:
raise TypeError('Cannot mix attachments and files keyword arguments.')
payload = {}
if embeds is not MISSING:
if len(embeds) > 10:
raise ValueError('embeds has a maximum of 10 elements.')
payload['embeds'] = [e.to_dict() for e in embeds]
if embed is not MISSING:
if embed is None:
payload['embeds'] = []
else:
payload['embeds'] = [embed.to_dict()]
if content is not MISSING:
if content is not None:
payload['content'] = str(content)
else:
payload['content'] = None
if view is not MISSING:
if view is not None:
payload['components'] = view.to_components()
if view.has_components_v2():
if flags is not MISSING:
flags.components_v2 = True
else:
flags = MessageFlags(components_v2=True)
else:
payload['components'] = []
if nonce is not None:
payload['nonce'] = str(nonce)
payload['enforce_nonce'] = True
if message_reference is not MISSING:
payload['message_reference'] = message_reference
if stickers is not MISSING:
if stickers is not None:
payload['sticker_ids'] = stickers
else:
payload['sticker_ids'] = []
payload['tts'] = tts
if avatar_url:
payload['avatar_url'] = str(avatar_url)
if username:
payload['username'] = username
if flags is not MISSING:
payload['flags'] = flags.value
if thread_name is not MISSING:
payload['thread_name'] = thread_name
if allowed_mentions:
if previous_allowed_mentions is not None:
payload['allowed_mentions'] = previous_allowed_mentions.merge(allowed_mentions).to_dict()
else:
payload['allowed_mentions'] = allowed_mentions.to_dict()
elif previous_allowed_mentions is not None:
payload['allowed_mentions'] = previous_allowed_mentions.to_dict()
if mention_author is not None:
if 'allowed_mentions' not in payload:
payload['allowed_mentions'] = AllowedMentions().to_dict()
payload['allowed_mentions']['replied_user'] = mention_author
if attachments is MISSING:
attachments = files
else:
files = [a for a in attachments if isinstance(a, File)]
if attachments is not MISSING:
file_index = 0
attachments_payload = []
for attachment in attachments:
if isinstance(attachment, File):
attachments_payload.append(attachment.to_dict(file_index))
file_index += 1
else:
attachments_payload.append(attachment.to_dict())
payload['attachments'] = attachments_payload
if applied_tags is not MISSING:
if applied_tags is not None:
payload['applied_tags'] = applied_tags
else:
payload['applied_tags'] = []
if channel_payload is not MISSING:
payload = {
'message': payload,
}
payload.update(channel_payload)
if poll not in (MISSING, None):
payload['poll'] = poll._to_dict() # type: ignore
multipart = []
if files:
multipart.append({'name': 'payload_json', 'value': utils._to_json(payload)})
payload = None
for index, file in enumerate(files):
multipart.append(
{
'name': f'files[{index}]',
'value': file.fp,
'filename': file.filename,
'content_type': 'application/octet-stream',
}
)
return MultipartParameters(payload=payload, multipart=multipart, files=files)
INTERNAL_API_VERSION: int = 10
def _set_api_version(value: int):
global INTERNAL_API_VERSION
if not isinstance(value, int):
raise TypeError(f'expected int not {value.__class__.__name__}')
if value not in (9, 10):
raise ValueError(f'expected either 9 or 10 not {value}')
INTERNAL_API_VERSION = value
Route.BASE = f'https://discord.com/api/v{value}'
class Route:
BASE: ClassVar[str] = 'https://discord.com/api/v10'
def __init__(self, method: str, path: str, *, metadata: Optional[str] = None, **parameters: Any) -> None:
self.path: str = path
self.method: str = method
# Metadata is a special string used to differentiate between known sub rate limits
# Since these can't be handled generically, this is the next best way to do so.
self.metadata: Optional[str] = metadata
url = self.BASE + self.path
if parameters:
url = url.format_map({k: _uriquote(v, safe='') if isinstance(v, str) else v for k, v in parameters.items()})
self.url: str = url
# major parameters:
self.channel_id: Optional[Snowflake] = parameters.get('channel_id')
self.guild_id: Optional[Snowflake] = parameters.get('guild_id')
self.webhook_id: Optional[Snowflake] = parameters.get('webhook_id')
self.webhook_token: Optional[str] = parameters.get('webhook_token')
@property
def key(self) -> str:
"""The bucket key is used to represent the route in various mappings."""
if self.metadata:
return f'{self.method} {self.path}:{self.metadata}'
return f'{self.method} {self.path}'
@property
def major_parameters(self) -> str:
"""Returns the major parameters formatted a string.
This needs to be appended to a bucket hash to constitute as a full rate limit key.
"""
return '+'.join(
str(k) for k in (self.channel_id, self.guild_id, self.webhook_id, self.webhook_token) if k is not None
)
class Ratelimit:
"""Represents a Discord rate limit.
This is similar to a semaphore except tailored to Discord's rate limits. This is aware of
the expiry of a token window, along with the number of tokens available. The goal of this
design is to increase throughput of requests being sent concurrently by forcing everything
into a single lock queue per route.
"""
__slots__ = (
'key',
'limit',
'remaining',
'reset_at',
'pending',
'outgoing',
'one_shot',
'http',
'_last_request',
'_lock',
'_event',
)
def __init__(self, http: HTTPClient, key: str) -> None:
self.key = key
self.limit: int = 1
self.remaining: int = self.limit
self.reset_at: float = 0.0
self.pending: int = 0
self.outgoing: int = 0
self.one_shot: bool = False
self.http: HTTPClient = http
self._last_request: float = 0.0
self._lock: asyncio.Lock = asyncio.Lock()
self._event: asyncio.Event = asyncio.Event()
def __repr__(self) -> str:
return (
f'<RateLimitBucket limit={self.limit} remaining={self.remaining} pending={self.pending}>'
)
def no_headers(self) -> None:
self.one_shot = True
self._event.set()
self._event.clear()
def reset(self) -> None:
self.remaining = self.limit - self.outgoing
self.reset_at = 0.0
def update(self, response: aiohttp.ClientResponse) -> bool:
# Shared scope 429 has longer "reset_at", determined using the retry-after field
limit = int(response.headers['X-Ratelimit-Limit'])
if response.headers.get('X-RateLimit-Scope') == 'shared':
reset_at = self.http.loop.time() + float(response.headers['Retry-After'])
remaining = 0
else:
# Consider a lower remaining value because updates can be out of order, so self.outgoing is used
reset_at = self.http.loop.time() + (float(response.headers['X-Ratelimit-Reset']) - time.time())
remaining = min(int(response.headers['X-Ratelimit-Remaining']), limit - self.outgoing)
# The checks below combats out-of-order responses and alternating sub ratelimits.
# As a result, there will be lower throughput for routes with subratelimits.
# Unless we document and maintain the undocumented and unstable subratelimits ourselves.
# 1. Completely ignore if the ratelimit window has expired; that data is useless
if self.http.loop.time() >= reset_at:
return False
# 2. Always use the longest reset_at value
update = False
if reset_at > self.reset_at:
self.reset_at = reset_at
update = True
# 3. Always use the lowest remaining value
if remaining < self.remaining:
self.remaining = remaining
update = True
self.limit = limit
self.one_shot = False
# Return whether this update was relevant or not.
return update
def is_inactive(self) -> bool:
delta = self.http.loop.time() - self._last_request
return delta >= 300 and (self.one_shot or (self.outgoing == 0 and self.pending == 0))
async def _wait_global(self, start_time: float):
# Sleep up to 3 times, to account for global reset at overwriting during sleeps
for i in range(3):
seconds = self.http.global_reset_at - start_time
if seconds > 0:
await asyncio.sleep(seconds)
continue
break
else:
raise ValueError("Global reset at changed more than 3 times")
async def _wait(self):
# Consider waiting if none is remaining
if not self.remaining:
# If reset_at is not set yet, wait for the last request, if outgoing, to finish first
# for up to 3 seconds instead of using aiohttp's default 5 min timeout.
if not self.reset_at and (not self._last_request or self.http.loop.time() - self._last_request < 3):
try:
await asyncio.wait_for(self._event.wait(), 3)
except asyncio.TimeoutError:
fmt = 'Initial request for rate limit bucket (%s) never finished. Skipping.'
_log.warning(fmt, self.key)
pass
# If none are still remaining then start sleeping
if not self.remaining and not self.one_shot:
# Sleep up to 3 times, giving room for a bucket update and a bucket change
# or 2 sub-ratelimit bucket changes, prioritizing is handled in update()
for i in range(3):
seconds = self.reset_at - self.http.loop.time()
copy = self.reset_at
if seconds > 0:
if i:
fmt = 'A rate limit bucket (%s) has changed reset_at during sleep. Sleeping again for %.2f seconds.'
else:
fmt = 'A rate limit bucket (%s) has been exhausted. Sleeping for %.2f seconds.'
_log.warning(fmt, self.key, seconds)
await asyncio.sleep(seconds)
if copy == self.reset_at:
self.reset()
elif not self.remaining and not self.one_shot:
continue # sleep again
break
else:
raise ValueError("Reset at changed more than 3 times")
async def acquire(self):
start_time: float = self.http.loop.time()
# Resources confirmed to be "one shots" after the first request like interaction response
# don't have ratelimits and can skip the entire queue logic except global wait
if self.one_shot:
await self._wait_global(start_time)
else:
# Ensure only 1 request goes through the inner acquire logic at a time
self.pending += 1
async with self._lock:
await self._wait()
await self._wait_global(start_time)
self.remaining -= 1 # one shot changing this doesn't matter
self.pending -= 1
self.outgoing += 1
self._last_request = self.http.loop.time()
return self
async def release(self):
if not self.one_shot:
self.outgoing -= 1
self._event.set()
self._event.clear()
async def __aenter__(self) -> Self:
await self.acquire()
return self
async def __aexit__(self, type: Type[BE], value: BE, traceback: TracebackType) -> None:
await self.release()
# For some reason, the Discord voice websocket expects this header to be
# completely lowercase while aiohttp respects spec and does it as case-insensitive
aiohttp.hdrs.WEBSOCKET = 'websocket' # type: ignore
class HTTPClient:
"""Represents an HTTP client sending HTTP requests to the Discord API."""
def __init__(
self,
loop: asyncio.AbstractEventLoop,
connector: Optional[aiohttp.BaseConnector] = None,
*,
proxy: Optional[str] = None,
proxy_auth: Optional[aiohttp.BasicAuth] = None,
unsync_clock: bool = True,
http_trace: Optional[aiohttp.TraceConfig] = None,
max_ratelimit_timeout: Optional[float] = None,
) -> None:
self.loop: asyncio.AbstractEventLoop = loop
self.connector: aiohttp.BaseConnector = connector or MISSING
self.__session: aiohttp.ClientSession = MISSING # filled in static_login
# Route key -> Bucket hash
self._bucket_hashes: Dict[str, str] = {}
# Bucket Hash + Major Parameters -> Rate limit
# or
# Route key + Major Parameters -> Rate limit
# When the key is the latter, it is used for temporary
# one shot requests that don't have a bucket hash
# When this reaches 256 elements, it will try to evict based off of expiry
self._buckets: Dict[str, Ratelimit] = {}
self.global_reset_at: float = 0.0
self.token: Optional[str] = None
self.proxy: Optional[str] = proxy
self.proxy_auth: Optional[aiohttp.BasicAuth] = proxy_auth
self.http_trace: Optional[aiohttp.TraceConfig] = http_trace
self.use_clock: bool = not unsync_clock
self.max_ratelimit_timeout: Optional[float] = max(30.0, max_ratelimit_timeout) if max_ratelimit_timeout else None
user_agent = 'DiscordBot (https://github.com/Rapptz/discord.py {0}) Python/{1[0]}.{1[1]} aiohttp/{2}'
self.user_agent: str = user_agent.format(__version__, sys.version_info, aiohttp.__version__)
def clear(self) -> None:
if self.__session and self.__session.closed:
self.__session = MISSING
async def ws_connect(self, url: str, *, compress: int = 0) -> aiohttp.ClientWebSocketResponse:
kwargs = {
'proxy_auth': self.proxy_auth,
'proxy': self.proxy,
'max_msg_size': 0,
'timeout': 30.0,
'autoclose': False,
'headers': {
'User-Agent': self.user_agent,
},
'compress': compress,
}
return await self.__session.ws_connect(url, **kwargs)
def _try_clear_expired_ratelimits(self) -> None:
if len(self._buckets) < 256:
return
keys = [key for key, bucket in self._buckets.items() if bucket.is_inactive()]
for key in keys:
del self._buckets[key]
def get_ratelimit(self, key: str) -> Ratelimit:
try:
value = self._buckets[key]
except KeyError:
self._buckets[key] = value = Ratelimit(self, key)
self._try_clear_expired_ratelimits()
return value
async def request(
self,
route: Route,
*,
files: Optional[Sequence[File]] = None,
form: Optional[Iterable[Dict[str, Any]]] = None,
**kwargs: Any,
) -> Any:
method = route.method
url = route.url
route_key = route.key
bucket_hash = None
try:
bucket_hash = self._bucket_hashes[route_key]
except KeyError:
key = f'{route_key}:{route.major_parameters}'
else:
key = f'{bucket_hash}:{route.major_parameters}'
ratelimit = self.get_ratelimit(key)
# header creation
headers: Dict[str, str] = {
'User-Agent': self.user_agent,
}
if self.token is not None:
headers['Authorization'] = 'Bot ' + self.token
# some checking if it's a JSON request
if 'json' in kwargs:
headers['Content-Type'] = 'application/json'
kwargs['data'] = utils._to_json(kwargs.pop('json'))
try:
reason = kwargs.pop('reason')
except KeyError:
pass
else:
if reason:
headers['X-Audit-Log-Reason'] = _uriquote(reason, safe='/ ')
kwargs['headers'] = headers
# Proxy support
if self.proxy is not None:
kwargs['proxy'] = self.proxy
if self.proxy_auth is not None:
kwargs['proxy_auth'] = self.proxy_auth
response: Optional[aiohttp.ClientResponse] = None
data: Optional[Union[Dict[str, Any], str]] = None
for tries in range(5):
async with ratelimit:
if files:
for f in files:
f.reset(seek=tries)
if form:
# with quote_fields=True '[' and ']' in file field names are escaped, which discord does not support
form_data = aiohttp.FormData(quote_fields=False)
for params in form:
form_data.add_field(**params)
kwargs['data'] = form_data
try:
async with self.__session.request(method, url, **kwargs) as response:
_log.debug('%s %s with %s has returned %s', method, url, kwargs.get('data'), response.status)
# Endpoint has ratelimit headers
new_bucket_hash = response.headers.get('X-Ratelimit-Bucket')
if new_bucket_hash:
# Ratelimit headers are up to date and relevant
if ratelimit.update(response):
# Adjust key if the bucket has changed. Either encountered a sub-ratelimit
# or Discord just wants to change ratelimit values for an update probably.
if new_bucket_hash != bucket_hash:
if bucket_hash is not None:
fmt = 'A route (%s) has changed hashes: %s -> %s.'
_log.debug(fmt, route_key, bucket_hash, new_bucket_hash)
self._buckets.pop(key, None)
elif route_key not in self._bucket_hashes:
fmt = '%s has found its initial rate limit bucket hash (%s).'
_log.debug(fmt, route_key, new_bucket_hash)
self._bucket_hashes[route_key] = new_bucket_hash
ratelimit.key = new_bucket_hash + route.major_parameters
self._buckets[ratelimit.key] = ratelimit
# Global rate limit 429 wont have ratelimit headers (also can't tell if it's one-shot)
elif response.headers.get('X-RateLimit-Global'):
retry_after: float = float(response.headers['Retry-After'])
_log.warning('Global rate limit has been hit. Retrying in %.2f seconds.', retry_after)
self.global_reset_at = self.loop.time() + retry_after
# Endpoint does not have ratelimit headers; it's one-shot.
elif not ratelimit.one_shot:
ratelimit.no_headers()
# Errors have text involved in them so this is safe to call
data = await json_or_text(response)
# The request was successful so just return the text/json
if 300 > response.status >= 200:
_log.debug('%s %s has received %s', method, url, data)
return data
# We are being ratelimited
elif response.status == 429:
retry_after: float = float(response.headers['Retry-After']) # only in headers for cf ban
# Hit Cloudflare ban for too many invalid requests (10,000 per 10 minutes)
# An invalid HTTP request is 401, 403, or 429 (excluding "shared" scope).
# This ban is the only one that is IP based, and not just token based.
if not response.headers.get('Via'):
fmt = 'We are Cloudflare banned for %.2f seconds.'
_log.warning(fmt, method, url, retry_after)
raise HTTPException(response, data)
elif self.max_ratelimit_timeout and retry_after > self.max_ratelimit_timeout:
fmt = 'We are being rate limited. %s %s responded with 429. Timeout of %.2f was too long, erroring instead.'
_log.warning(fmt, method, url, retry_after)
raise RateLimited(retry_after)
elif ratelimit.remaining > 0:
# According to night, remaining > 0 and 429 means that a sub ratelimit was hit.
# https://github.com/discord/discord-api-docs/issues/2190#issuecomment-816363129
fmt = '%s %s received a 429 despite having %s remaining requests. This is a sub-ratelimit.'
_log.debug(fmt, method, url, ratelimit.remaining)
fmt = 'We are being rate limited. %s %s responded with 429. Retrying in %.2f seconds.'
_log.warning(fmt, method, url, retry_after)
# We've received a 500, 502, 504, or 524, unconditional retry
elif response.status in {500, 502, 504, 524}:
await asyncio.sleep(1 + tries * 2)
continue
# The usual error cases
elif response.status == 403:
raise Forbidden(response, data)
elif response.status == 404:
raise NotFound(response, data)
elif response.status >= 500:
raise DiscordServerError(response, data)
else:
raise HTTPException(response, data)
# This is handling exceptions from the request
except OSError as e:
if tries == 4 or e.errno not in (54, 10054):
raise ValueError("Connection reset by peer")
retry_seconds: int = 1 + tries * 2
fmt = 'OS error for %s %s. Retrying in %d seconds.'
_log.warning(fmt, method, url, retry_seconds)
await asyncio.sleep(retry_seconds)
# We've run out of retries, raise
if response is not None:
raise HTTPException(response, data)
else:
raise RuntimeError('Unreachable code in HTTP handling')
async def get_from_cdn(self, url: str) -> bytes:
kwargs = {}
# Proxy support
if self.proxy is not None:
kwargs['proxy'] = self.proxy
if self.proxy_auth is not None:
kwargs['proxy_auth'] = self.proxy_auth
async with self.__session.get(url, **kwargs) as resp:
if resp.status == 200:
return await resp.read()
elif resp.status == 404:
raise NotFound(resp, 'asset not found')
elif resp.status == 403:
raise Forbidden(resp, 'cannot retrieve asset')
else:
raise HTTPException(resp, 'failed to get asset')
raise RuntimeError('Unreachable')
# state management
async def close(self) -> None:
if self.__session:
await self.__session.close()
# login management
async def static_login(self, token: str) -> user.User:
# Necessary to get aiohttp to stop complaining about session creation
if self.connector is MISSING:
self.connector = aiohttp.TCPConnector(limit=0)
self.__session = aiohttp.ClientSession(
connector=self.connector,
ws_response_class=DiscordClientWebSocketResponse,
trace_configs=None if self.http_trace is None else [self.http_trace],
cookie_jar=aiohttp.DummyCookieJar(),
)
self._global_over = asyncio.Event()
self._global_over.set()
old_token = self.token
self.token = token
try:
data = await self.request(Route('GET', '/users/@me'))
except HTTPException as exc:
self.token = old_token
if exc.status == 401:
raise LoginFailure('Improper token has been passed.') from exc
raise
return data
def logout(self) -> Response[None]:
return self.request(Route('POST', '/auth/logout'))
# Group functionality
def start_group(self, user_id: Snowflake, recipients: List[int]) -> Response[channel.GroupDMChannel]:
payload = {
'recipients': recipients,
}
return self.request(Route('POST', '/users/{user_id}/channels', user_id=user_id), json=payload)
def leave_group(self, channel_id: Snowflake) -> Response[None]:
return self.request(Route('DELETE', '/channels/{channel_id}', channel_id=channel_id))
# Message management
def start_private_message(self, user_id: Snowflake) -> Response[channel.DMChannel]:
payload = {
'recipient_id': user_id,
}
return self.request(Route('POST', '/users/@me/channels'), json=payload)
def send_message(
self,
channel_id: Snowflake,
*,
params: MultipartParameters,
) -> Response[message.Message]:
r = Route('POST', '/channels/{channel_id}/messages', channel_id=channel_id)
if params.files:
return self.request(r, files=params.files, form=params.multipart)
else:
return self.request(r, json=params.payload)
def send_typing(self, channel_id: Snowflake) -> Response[None]:
return self.request(Route('POST', '/channels/{channel_id}/typing', channel_id=channel_id))
def delete_message(
self, channel_id: Snowflake, message_id: Snowflake, *, reason: Optional[str] = None
) -> Response[None]:
# Special case certain sub-rate limits
# https://github.com/discord/discord-api-docs/issues/1092
# https://github.com/discord/discord-api-docs/issues/1295
difference = utils.utcnow() - utils.snowflake_time(int(message_id))
metadata: Optional[str] = None
if difference <= datetime.timedelta(seconds=10):
metadata = 'sub-10-seconds'
elif difference >= datetime.timedelta(days=14):
metadata = 'older-than-two-weeks'
r = Route(
'DELETE',
'/channels/{channel_id}/messages/{message_id}',
channel_id=channel_id,
message_id=message_id,
metadata=metadata,
)
return self.request(r, reason=reason)
def delete_messages(
self, channel_id: Snowflake, message_ids: SnowflakeList, *, reason: Optional[str] = None
) -> Response[None]:
r = Route('POST', '/channels/{channel_id}/messages/bulk-delete', channel_id=channel_id)
payload = {
'messages': message_ids,
}
return self.request(r, json=payload, reason=reason)
def edit_message(
self, channel_id: Snowflake, message_id: Snowflake, *, params: MultipartParameters
) -> Response[message.Message]:
r = Route('PATCH', '/channels/{channel_id}/messages/{message_id}', channel_id=channel_id, message_id=message_id)
if params.files:
return self.request(r, files=params.files, form=params.multipart)
else:
return self.request(r, json=params.payload)
def add_reaction(self, channel_id: Snowflake, message_id: Snowflake, emoji: str) -> Response[None]:
r = Route(
'PUT',
'/channels/{channel_id}/messages/{message_id}/reactions/{emoji}/@me',
channel_id=channel_id,
message_id=message_id,
emoji=emoji,
)
return self.request(r)
def remove_reaction(
self, channel_id: Snowflake, message_id: Snowflake, emoji: str, member_id: Snowflake
) -> Response[None]:
r = Route(
'DELETE',
'/channels/{channel_id}/messages/{message_id}/reactions/{emoji}/{member_id}',
channel_id=channel_id,
message_id=message_id,
member_id=member_id,
emoji=emoji,
)
return self.request(r)
def remove_own_reaction(self, channel_id: Snowflake, message_id: Snowflake, emoji: str) -> Response[None]:
r = Route(
'DELETE',
'/channels/{channel_id}/messages/{message_id}/reactions/{emoji}/@me',
channel_id=channel_id,
message_id=message_id,
emoji=emoji,
)
return self.request(r)
def get_reaction_users(
self,
channel_id: Snowflake,
message_id: Snowflake,
emoji: str,
limit: int,
after: Optional[Snowflake] = None,
type: Optional[message.ReactionType] = None,
) -> Response[List[user.User]]:
r = Route(
'GET',
'/channels/{channel_id}/messages/{message_id}/reactions/{emoji}',
channel_id=channel_id,
message_id=message_id,
emoji=emoji,
)
params: Dict[str, Any] = {
'limit': limit,
}
if after:
params['after'] = after
if type is not None:
params['type'] = type
return self.request(r, params=params)
def clear_reactions(self, channel_id: Snowflake, message_id: Snowflake) -> Response[None]:
r = Route(
'DELETE',
'/channels/{channel_id}/messages/{message_id}/reactions',
channel_id=channel_id,
message_id=message_id,
)
return self.request(r)
def clear_single_reaction(self, channel_id: Snowflake, message_id: Snowflake, emoji: str) -> Response[None]:
r = Route(
'DELETE',
'/channels/{channel_id}/messages/{message_id}/reactions/{emoji}',
channel_id=channel_id,
message_id=message_id,
emoji=emoji,
)
return self.request(r)
def get_message(self, channel_id: Snowflake, message_id: Snowflake) -> Response[message.Message]:
r = Route('GET', '/channels/{channel_id}/messages/{message_id}', channel_id=channel_id, message_id=message_id)
return self.request(r)
def get_channel(self, channel_id: Snowflake) -> Response[channel.Channel]:
r = Route('GET', '/channels/{channel_id}', channel_id=channel_id)
return self.request(r)
def logs_from(
self,
channel_id: Snowflake,
limit: int,
before: Optional[Snowflake] = None,
after: Optional[Snowflake] = None,
around: Optional[Snowflake] = None,
) -> Response[List[message.Message]]:
params: Dict[str, Any] = {
'limit': limit,
}
if before is not None:
params['before'] = before
if after is not None:
params['after'] = after
if around is not None:
params['around'] = around
return self.request(Route('GET', '/channels/{channel_id}/messages', channel_id=channel_id), params=params)
def publish_message(self, channel_id: Snowflake, message_id: Snowflake) -> Response[message.Message]:
return self.request(
Route(
'POST',
'/channels/{channel_id}/messages/{message_id}/crosspost',
channel_id=channel_id,
message_id=message_id,
)
)
def pin_message(self, channel_id: Snowflake, message_id: Snowflake, reason: Optional[str] = None) -> Response[None]:
r = Route(
'PUT',
'/channels/{channel_id}/messages/pins/{message_id}',
channel_id=channel_id,
message_id=message_id,
)
return self.request(r, reason=reason)
def unpin_message(self, channel_id: Snowflake, message_id: Snowflake, reason: Optional[str] = None) -> Response[None]:
r = Route(
'DELETE',
'/channels/{channel_id}/messages/pins/{message_id}',
channel_id=channel_id,
message_id=message_id,
)
return self.request(r, reason=reason)
def pins_from(
self,
channel_id: Snowflake,
limit: Optional[int] = None,
before: Optional[str] = None,
) -> Response[message.ChannelPins]:
params = {}
if before is not None:
params['before'] = before
if limit is not None:
params['limit'] = limit
return self.request(Route('GET', '/channels/{channel_id}/messages/pins', channel_id=channel_id), params=params)
# Member management
def kick(self, user_id: Snowflake, guild_id: Snowflake, reason: Optional[str] = None) -> Response[None]:
r = Route('DELETE', '/guilds/{guild_id}/members/{user_id}', guild_id=guild_id, user_id=user_id)
return self.request(r, reason=reason)
def ban(
self,
user_id: Snowflake,
guild_id: Snowflake,
delete_message_seconds: int = 86400, # one day
reason: Optional[str] = None,
) -> Response[None]:
r = Route('PUT', '/guilds/{guild_id}/bans/{user_id}', guild_id=guild_id, user_id=user_id)
params = {
'delete_message_seconds': delete_message_seconds,
}
return self.request(r, params=params, reason=reason)
def unban(self, user_id: Snowflake, guild_id: Snowflake, *, reason: Optional[str] = None) -> Response[None]:
r = Route('DELETE', '/guilds/{guild_id}/bans/{user_id}', guild_id=guild_id, user_id=user_id)
return self.request(r, reason=reason)
def bulk_ban(
self,
guild_id: Snowflake,
user_ids: List[Snowflake],
delete_message_seconds: int = 86400,
reason: Optional[str] = None,
) -> Response[guild.BulkBanUserResponse]:
r = Route('POST', '/guilds/{guild_id}/bulk-ban', guild_id=guild_id)
payload = {
'user_ids': user_ids,
'delete_message_seconds': delete_message_seconds,
}
return self.request(r, json=payload, reason=reason)
def guild_voice_state(
self,
user_id: Snowflake,
guild_id: Snowflake,
*,
mute: Optional[bool] = None,
deafen: Optional[bool] = None,
reason: Optional[str] = None,
) -> Response[member.Member]:
r = Route('PATCH', '/guilds/{guild_id}/members/{user_id}', guild_id=guild_id, user_id=user_id)
payload = {}
if mute is not None:
payload['mute'] = mute
if deafen is not None:
payload['deaf'] = deafen
return self.request(r, json=payload, reason=reason)
def edit_profile(self, payload: Dict[str, Any]) -> Response[user.User]:
return self.request(Route('PATCH', '/users/@me'), json=payload)
def change_my_nickname(
self,
guild_id: Snowflake,
nickname: str,
*,
reason: Optional[str] = None,
) -> Response[member.Nickname]:
r = Route('PATCH', '/guilds/{guild_id}/members/@me/nick', guild_id=guild_id)
payload = {
'nick': nickname,
}
return self.request(r, json=payload, reason=reason)
def change_nickname(
self,
guild_id: Snowflake,
user_id: Snowflake,
nickname: str,
*,
reason: Optional[str] = None,
) -> Response[member.Member]:
r = Route('PATCH', '/guilds/{guild_id}/members/{user_id}', guild_id=guild_id, user_id=user_id)
payload = {
'nick': nickname,
}
return self.request(r, json=payload, reason=reason)
def edit_my_voice_state(self, guild_id: Snowflake, payload: Dict[str, Any]) -> Response[None]:
r = Route('PATCH', '/guilds/{guild_id}/voice-states/@me', guild_id=guild_id)
return self.request(r, json=payload)
def edit_voice_state(self, guild_id: Snowflake, user_id: Snowflake, payload: Dict[str, Any]) -> Response[None]:
r = Route('PATCH', '/guilds/{guild_id}/voice-states/{user_id}', guild_id=guild_id, user_id=user_id)
return self.request(r, json=payload)
def edit_member(
self,
guild_id: Snowflake,
user_id: Snowflake,
*,
reason: Optional[str] = None,
**fields: Any,
) -> Response[member.MemberWithUser]:
r = Route('PATCH', '/guilds/{guild_id}/members/{user_id}', guild_id=guild_id, user_id=user_id)
return self.request(r, json=fields, reason=reason)
def get_my_voice_state(self, guild_id: Snowflake) -> Response[voice.GuildVoiceState]:
return self.request(Route('GET', '/guilds/{guild_id}/voice-states/@me', guild_id=guild_id))
def get_voice_state(self, guild_id: Snowflake, user_id: Snowflake) -> Response[voice.GuildVoiceState]:
return self.request(Route('GET', '/guilds/{guild_id}/voice-states/{user_id}', guild_id=guild_id, user_id=user_id))
# Channel management
def edit_channel(
self,
channel_id: Snowflake,
*,
reason: Optional[str] = None,
**options: Any,
) -> Response[channel.Channel]:
r = Route('PATCH', '/channels/{channel_id}', channel_id=channel_id)
valid_keys = (
'name',
'parent_id',
'topic',
'bitrate',
'nsfw',
'user_limit',
'position',
'permission_overwrites',
'rate_limit_per_user',
'type',
'rtc_region',
'video_quality_mode',
'archived',
'auto_archive_duration',
'locked',
'invitable',
'default_auto_archive_duration',
'flags',
'default_thread_rate_limit_per_user',
'default_reaction_emoji',
'available_tags',
'applied_tags',
'default_forum_layout',
'default_sort_order',
)
payload = {k: v for k, v in options.items() if k in valid_keys}
return self.request(r, reason=reason, json=payload)
def edit_voice_channel_status(
self, status: Optional[str], *, channel_id: int, reason: Optional[str] = None
) -> Response[None]:
r = Route('PUT', '/channels/{channel_id}/voice-status', channel_id=channel_id)
payload = {'status': status}
return self.request(r, reason=reason, json=payload)
def bulk_channel_update(
self,
guild_id: Snowflake,
data: List[guild.ChannelPositionUpdate],
*,
reason: Optional[str] = None,
) -> Response[None]:
r = Route('PATCH', '/guilds/{guild_id}/channels', guild_id=guild_id)
return self.request(r, json=data, reason=reason)
def create_channel(
self,
guild_id: Snowflake,
channel_type: channel.ChannelType,
*,
reason: Optional[str] = None,
**options: Any,
) -> Response[channel.GuildChannel]:
payload = {
'type': channel_type,
}
valid_keys = (
'name',
'parent_id',
'topic',
'bitrate',
'nsfw',
'user_limit',
'position',
'permission_overwrites',
'rate_limit_per_user',
'rtc_region',
'video_quality_mode',
'default_auto_archive_duration',
'default_thread_rate_limit_per_user',
'default_sort_order',
'default_reaction_emoji',
'default_forum_layout',
'available_tags',
)
payload.update({k: v for k, v in options.items() if k in valid_keys and v is not None})
return self.request(Route('POST', '/guilds/{guild_id}/channels', guild_id=guild_id), json=payload, reason=reason)
def delete_channel(
self,
channel_id: Snowflake,
*,
reason: Optional[str] = None,
) -> Response[None]:
return self.request(Route('DELETE', '/channels/{channel_id}', channel_id=channel_id), reason=reason)
# Thread management
def start_thread_with_message(
self,
channel_id: Snowflake,
message_id: Snowflake,
*,
name: str,
auto_archive_duration: threads.ThreadArchiveDuration,
rate_limit_per_user: Optional[int] = None,
reason: Optional[str] = None,
) -> Response[threads.Thread]:
payload = {
'name': name,
'auto_archive_duration': auto_archive_duration,
'rate_limit_per_user': rate_limit_per_user,
}
route = Route(
'POST', '/channels/{channel_id}/messages/{message_id}/threads', channel_id=channel_id, message_id=message_id
)
return self.request(route, json=payload, reason=reason)
def start_thread_without_message(
self,
channel_id: Snowflake,
*,
name: str,
auto_archive_duration: threads.ThreadArchiveDuration,
type: threads.ThreadType,
invitable: bool = True,
rate_limit_per_user: Optional[int] = None,
reason: Optional[str] = None,
) -> Response[threads.Thread]:
payload = {
'name': name,
'auto_archive_duration': auto_archive_duration,
'type': type,
'invitable': invitable,
'rate_limit_per_user': rate_limit_per_user,
}
route = Route('POST', '/channels/{channel_id}/threads', channel_id=channel_id)
return self.request(route, json=payload, reason=reason)
def start_thread_in_forum(
self,
channel_id: Snowflake,
*,
params: MultipartParameters,
reason: Optional[str] = None,
) -> Response[threads.ForumThread]:
query = {'use_nested_fields': 1}
r = Route('POST', '/channels/{channel_id}/threads', channel_id=channel_id)
if params.files:
return self.request(r, files=params.files, form=params.multipart, params=query, reason=reason)
else:
return self.request(r, json=params.payload, params=query, reason=reason)
def join_thread(self, channel_id: Snowflake) -> Response[None]:
return self.request(Route('POST', '/channels/{channel_id}/thread-members/@me', channel_id=channel_id))
def add_user_to_thread(self, channel_id: Snowflake, user_id: Snowflake) -> Response[None]:
return self.request(
Route('PUT', '/channels/{channel_id}/thread-members/{user_id}', channel_id=channel_id, user_id=user_id)
)
def leave_thread(self, channel_id: Snowflake) -> Response[None]:
return self.request(Route('DELETE', '/channels/{channel_id}/thread-members/@me', channel_id=channel_id))
def remove_user_from_thread(self, channel_id: Snowflake, user_id: Snowflake) -> Response[None]:
route = Route('DELETE', '/channels/{channel_id}/thread-members/{user_id}', channel_id=channel_id, user_id=user_id)
return self.request(route)
def get_public_archived_threads(
self, channel_id: Snowflake, before: Optional[Snowflake] = None, limit: int = 50
) -> Response[threads.ThreadPaginationPayload]:
route = Route('GET', '/channels/{channel_id}/threads/archived/public', channel_id=channel_id)
params = {}
if before:
params['before'] = before
params['limit'] = limit
return self.request(route, params=params)
def get_private_archived_threads(
self, channel_id: Snowflake, before: Optional[Snowflake] = None, limit: int = 50
) -> Response[threads.ThreadPaginationPayload]:
route = Route('GET', '/channels/{channel_id}/threads/archived/private', channel_id=channel_id)
params = {}
if before:
params['before'] = before
params['limit'] = limit
return self.request(route, params=params)
def get_joined_private_archived_threads(
self, channel_id: Snowflake, before: Optional[Snowflake] = None, limit: int = 50
) -> Response[threads.ThreadPaginationPayload]:
route = Route('GET', '/channels/{channel_id}/users/@me/threads/archived/private', channel_id=channel_id)
params = {}
if before:
params['before'] = before
params['limit'] = limit
return self.request(route, params=params)
def get_active_threads(self, guild_id: Snowflake) -> Response[threads.ThreadPaginationPayload]:
route = Route('GET', '/guilds/{guild_id}/threads/active', guild_id=guild_id)
return self.request(route)
def get_thread_member(self, channel_id: Snowflake, user_id: Snowflake) -> Response[threads.ThreadMember]:
route = Route('GET', '/channels/{channel_id}/thread-members/{user_id}', channel_id=channel_id, user_id=user_id)
return self.request(route)
def get_thread_members(self, channel_id: Snowflake) -> Response[List[threads.ThreadMember]]:
route = Route('GET', '/channels/{channel_id}/thread-members', channel_id=channel_id)
return self.request(route)
# Webhook management
def create_webhook(
self,
channel_id: Snowflake,
*,
name: str,
avatar: Optional[bytes] = None,
reason: Optional[str] = None,
) -> Response[webhook.Webhook]:
payload: Dict[str, Any] = {
'name': name,
}
if avatar is not None:
payload['avatar'] = avatar
r = Route('POST', '/channels/{channel_id}/webhooks', channel_id=channel_id)
return self.request(r, json=payload, reason=reason)
def channel_webhooks(self, channel_id: Snowflake) -> Response[List[webhook.Webhook]]:
return self.request(Route('GET', '/channels/{channel_id}/webhooks', channel_id=channel_id))
def guild_webhooks(self, guild_id: Snowflake) -> Response[List[webhook.Webhook]]:
return self.request(Route('GET', '/guilds/{guild_id}/webhooks', guild_id=guild_id))
def get_webhook(self, webhook_id: Snowflake) -> Response[webhook.Webhook]:
return self.request(Route('GET', '/webhooks/{webhook_id}', webhook_id=webhook_id))
def follow_webhook(
self,
channel_id: Snowflake,
webhook_channel_id: Snowflake,
reason: Optional[str] = None,
) -> Response[None]:
payload = {
'webhook_channel_id': str(webhook_channel_id),
}
return self.request(
Route('POST', '/channels/{channel_id}/followers', channel_id=channel_id), json=payload, reason=reason
)
# Guild management
def get_guilds(
self,
limit: int,
before: Optional[Snowflake] = None,
after: Optional[Snowflake] = None,
with_counts: bool = True,
) -> Response[List[guild.Guild]]:
params: Dict[str, Any] = {
'limit': limit,
'with_counts': int(with_counts),
}
if before:
params['before'] = before
if after:
params['after'] = after
return self.request(Route('GET', '/users/@me/guilds'), params=params)
def leave_guild(self, guild_id: Snowflake) -> Response[None]:
return self.request(Route('DELETE', '/users/@me/guilds/{guild_id}', guild_id=guild_id))
def get_guild(self, guild_id: Snowflake, *, with_counts: bool = True) -> Response[guild.Guild]:
params = {'with_counts': int(with_counts)}
return self.request(Route('GET', '/guilds/{guild_id}', guild_id=guild_id), params=params)
def get_guild_preview(self, guild_id: Snowflake) -> Response[guild.GuildPreview]:
return self.request(Route('GET', '/guilds/{guild_id}/preview', guild_id=guild_id))
def delete_guild(self, guild_id: Snowflake) -> Response[None]:
return self.request(Route('DELETE', '/guilds/{guild_id}', guild_id=guild_id))
def create_guild(self, name: str, icon: Optional[str]) -> Response[guild.Guild]:
payload = {
'name': name,
}
if icon:
payload['icon'] = icon
return self.request(Route('POST', '/guilds'), json=payload)
def edit_guild(self, guild_id: Snowflake, *, reason: Optional[str] = None, **fields: Any) -> Response[guild.Guild]:
valid_keys = (
'name',
'region',
'icon',
'afk_timeout',
'owner_id',
'afk_channel_id',
'splash',
'discovery_splash',
'features',
'verification_level',
'system_channel_id',
'default_message_notifications',
'description',
'explicit_content_filter',
'banner',
'system_channel_flags',
'rules_channel_id',
'public_updates_channel_id',
'preferred_locale',
'premium_progress_bar_enabled',
'safety_alerts_channel_id',
)
payload = {k: v for k, v in fields.items() if k in valid_keys}
return self.request(Route('PATCH', '/guilds/{guild_id}', guild_id=guild_id), json=payload, reason=reason)
def edit_guild_mfa_level(
self, guild_id: Snowflake, *, mfa_level: int, reason: Optional[str] = None
) -> Response[guild.GuildMFALevel]:
payload = {'level': mfa_level}
return self.request(Route('POST', '/guilds/{guild_id}/mfa', guild_id=guild_id), json=payload, reason=reason)
def get_template(self, code: str) -> Response[template.Template]:
return self.request(Route('GET', '/guilds/templates/{code}', code=code))
def guild_templates(self, guild_id: Snowflake) -> Response[List[template.Template]]:
return self.request(Route('GET', '/guilds/{guild_id}/templates', guild_id=guild_id))
def create_template(self, guild_id: Snowflake, payload: Dict[str, Any]) -> Response[template.Template]:
return self.request(Route('POST', '/guilds/{guild_id}/templates', guild_id=guild_id), json=payload)
def sync_template(self, guild_id: Snowflake, code: str) -> Response[template.Template]:
return self.request(Route('PUT', '/guilds/{guild_id}/templates/{code}', guild_id=guild_id, code=code))
def edit_template(self, guild_id: Snowflake, code: str, payload: Dict[str, Any]) -> Response[template.Template]:
valid_keys = (
'name',
'description',
)
payload = {k: v for k, v in payload.items() if k in valid_keys}
return self.request(
Route('PATCH', '/guilds/{guild_id}/templates/{code}', guild_id=guild_id, code=code), json=payload
)
def delete_template(self, guild_id: Snowflake, code: str) -> Response[None]:
return self.request(Route('DELETE', '/guilds/{guild_id}/templates/{code}', guild_id=guild_id, code=code))
def create_from_template(self, code: str, name: str, icon: Optional[str]) -> Response[guild.Guild]:
payload = {
'name': name,
}
if icon:
payload['icon'] = icon
return self.request(Route('POST', '/guilds/templates/{code}', code=code), json=payload)
def get_bans(
self,
guild_id: Snowflake,
limit: int,
before: Optional[Snowflake] = None,
after: Optional[Snowflake] = None,
) -> Response[List[guild.Ban]]:
params: Dict[str, Any] = {
'limit': limit,
}
if before is not None:
params['before'] = before
if after is not None:
params['after'] = after
return self.request(Route('GET', '/guilds/{guild_id}/bans', guild_id=guild_id), params=params)
def get_welcome_screen(self, guild_id: Snowflake) -> Response[welcome_screen.WelcomeScreen]:
return self.request(Route('GET', '/guilds/{guild_id}/welcome-screen', guild_id=guild_id))
def edit_welcome_screen(
self, guild_id: Snowflake, *, reason: Optional[str] = None, **fields: Any
) -> Response[welcome_screen.WelcomeScreen]:
valid_keys = (
'description',
'welcome_channels',
'enabled',
)
payload = {k: v for k, v in fields.items() if k in valid_keys}
return self.request(
Route('PATCH', '/guilds/{guild_id}/welcome-screen', guild_id=guild_id), json=payload, reason=reason
)
def get_ban(self, user_id: Snowflake, guild_id: Snowflake) -> Response[guild.Ban]:
return self.request(Route('GET', '/guilds/{guild_id}/bans/{user_id}', guild_id=guild_id, user_id=user_id))
def get_vanity_code(self, guild_id: Snowflake) -> Response[invite.VanityInvite]:
return self.request(Route('GET', '/guilds/{guild_id}/vanity-url', guild_id=guild_id))
def change_vanity_code(self, guild_id: Snowflake, code: str, *, reason: Optional[str] = None) -> Response[None]:
payload: Dict[str, Any] = {'code': code}
return self.request(Route('PATCH', '/guilds/{guild_id}/vanity-url', guild_id=guild_id), json=payload, reason=reason)
def get_all_guild_channels(self, guild_id: Snowflake) -> Response[List[guild.GuildChannel]]:
return self.request(Route('GET', '/guilds/{guild_id}/channels', guild_id=guild_id))
def get_members(
self, guild_id: Snowflake, limit: int, after: Optional[Snowflake]
) -> Response[List[member.MemberWithUser]]:
params: Dict[str, Any] = {
'limit': limit,
}
if after:
params['after'] = after
r = Route('GET', '/guilds/{guild_id}/members', guild_id=guild_id)
return self.request(r, params=params)
def get_member(self, guild_id: Snowflake, member_id: Snowflake) -> Response[member.MemberWithUser]:
return self.request(Route('GET', '/guilds/{guild_id}/members/{member_id}', guild_id=guild_id, member_id=member_id))
def prune_members(
self,
guild_id: Snowflake,
days: int,
compute_prune_count: bool,
roles: Iterable[str],
*,
reason: Optional[str] = None,
) -> Response[guild.GuildPrune]:
payload: Dict[str, Any] = {
'days': days,
'compute_prune_count': 'true' if compute_prune_count else 'false',
}
if roles:
payload['include_roles'] = ', '.join(roles)
return self.request(Route('POST', '/guilds/{guild_id}/prune', guild_id=guild_id), json=payload, reason=reason)
def estimate_pruned_members(
self,
guild_id: Snowflake,
days: int,
roles: Iterable[str],
) -> Response[guild.GuildPrune]:
params: Dict[str, Any] = {
'days': days,
}
if roles:
params['include_roles'] = ', '.join(roles)
return self.request(Route('GET', '/guilds/{guild_id}/prune', guild_id=guild_id), params=params)
def get_sticker(self, sticker_id: Snowflake) -> Response[sticker.Sticker]:
return self.request(Route('GET', '/stickers/{sticker_id}', sticker_id=sticker_id))
def get_sticker_pack(self, sticker_pack_id: Snowflake) -> Response[sticker.StickerPack]:
return self.request(Route('GET', '/sticker-packs/{sticker_pack_id}', sticker_pack_id=sticker_pack_id))
def list_premium_sticker_packs(self) -> Response[sticker.ListPremiumStickerPacks]:
return self.request(Route('GET', '/sticker-packs'))
def get_all_guild_stickers(self, guild_id: Snowflake) -> Response[List[sticker.GuildSticker]]:
return self.request(Route('GET', '/guilds/{guild_id}/stickers', guild_id=guild_id))
def get_guild_sticker(self, guild_id: Snowflake, sticker_id: Snowflake) -> Response[sticker.GuildSticker]:
return self.request(
Route('GET', '/guilds/{guild_id}/stickers/{sticker_id}', guild_id=guild_id, sticker_id=sticker_id)
)
def create_guild_sticker(
self, guild_id: Snowflake, payload: Dict[str, Any], file: File, reason: Optional[str]
) -> Response[sticker.GuildSticker]:
initial_bytes = file.fp.read(16)
try:
mime_type = utils._get_mime_type_for_image(initial_bytes)
except ValueError:
if initial_bytes.startswith(b'{'):
mime_type = 'application/json'
else:
mime_type = 'application/octet-stream'
finally:
file.reset()
form: List[Dict[str, Any]] = [
{
'name': 'file',
'value': file.fp,
'filename': file.filename,
'content_type': mime_type,
}
]
for k, v in payload.items():
form.append(
{
'name': k,
'value': v,
}
)
return self.request(
Route('POST', '/guilds/{guild_id}/stickers', guild_id=guild_id), form=form, files=[file], reason=reason
)
def modify_guild_sticker(
self,
guild_id: Snowflake,
sticker_id: Snowflake,
payload: Dict[str, Any],
reason: Optional[str],
) -> Response[sticker.GuildSticker]:
return self.request(
Route('PATCH', '/guilds/{guild_id}/stickers/{sticker_id}', guild_id=guild_id, sticker_id=sticker_id),
json=payload,
reason=reason,
)
def delete_guild_sticker(self, guild_id: Snowflake, sticker_id: Snowflake, reason: Optional[str]) -> Response[None]:
return self.request(
Route('DELETE', '/guilds/{guild_id}/stickers/{sticker_id}', guild_id=guild_id, sticker_id=sticker_id),
reason=reason,
)
def get_all_custom_emojis(self, guild_id: Snowflake) -> Response[List[emoji.Emoji]]:
return self.request(Route('GET', '/guilds/{guild_id}/emojis', guild_id=guild_id))
def get_custom_emoji(self, guild_id: Snowflake, emoji_id: Snowflake) -> Response[emoji.Emoji]:
return self.request(Route('GET', '/guilds/{guild_id}/emojis/{emoji_id}', guild_id=guild_id, emoji_id=emoji_id))
def create_custom_emoji(
self,
guild_id: Snowflake,
name: str,
image: str,
*,
roles: Optional[SnowflakeList] = None,
reason: Optional[str] = None,
) -> Response[emoji.Emoji]:
payload = {
'name': name,
'image': image,
'roles': roles or [],
}
r = Route('POST', '/guilds/{guild_id}/emojis', guild_id=guild_id)
return self.request(r, json=payload, reason=reason)
def delete_custom_emoji(
self,
guild_id: Snowflake,
emoji_id: Snowflake,
*,
reason: Optional[str] = None,
) -> Response[None]:
r = Route('DELETE', '/guilds/{guild_id}/emojis/{emoji_id}', guild_id=guild_id, emoji_id=emoji_id)
return self.request(r, reason=reason)
def edit_custom_emoji(
self,
guild_id: Snowflake,
emoji_id: Snowflake,
*,
payload: Dict[str, Any],
reason: Optional[str] = None,
) -> Response[emoji.Emoji]:
r = Route('PATCH', '/guilds/{guild_id}/emojis/{emoji_id}', guild_id=guild_id, emoji_id=emoji_id)
return self.request(r, json=payload, reason=reason)
def get_all_integrations(self, guild_id: Snowflake) -> Response[List[integration.Integration]]:
r = Route('GET', '/guilds/{guild_id}/integrations', guild_id=guild_id)
return self.request(r)
def create_integration(self, guild_id: Snowflake, type: integration.IntegrationType, id: int) -> Response[None]:
payload = {
'type': type,
'id': id,
}
r = Route('POST', '/guilds/{guild_id}/integrations', guild_id=guild_id)
return self.request(r, json=payload)
def edit_integration(self, guild_id: Snowflake, integration_id: Snowflake, **payload: Any) -> Response[None]:
r = Route(
'PATCH', '/guilds/{guild_id}/integrations/{integration_id}', guild_id=guild_id, integration_id=integration_id
)
return self.request(r, json=payload)
def sync_integration(self, guild_id: Snowflake, integration_id: Snowflake) -> Response[None]:
r = Route(
'POST', '/guilds/{guild_id}/integrations/{integration_id}/sync', guild_id=guild_id, integration_id=integration_id
)
return self.request(r)
def delete_integration(
self, guild_id: Snowflake, integration_id: Snowflake, *, reason: Optional[str] = None
) -> Response[None]:
r = Route(
'DELETE', '/guilds/{guild_id}/integrations/{integration_id}', guild_id=guild_id, integration_id=integration_id
)
return self.request(r, reason=reason)
def get_audit_logs(
self,
guild_id: Snowflake,
limit: int = 100,
before: Optional[Snowflake] = None,
after: Optional[Snowflake] = None,
user_id: Optional[Snowflake] = None,
action_type: Optional[audit_log.AuditLogEvent] = None,
) -> Response[audit_log.AuditLog]:
params: Dict[str, Any] = {'limit': limit}
if before:
params['before'] = before
if after is not None:
params['after'] = after
if user_id:
params['user_id'] = user_id
if action_type:
params['action_type'] = action_type
r = Route('GET', '/guilds/{guild_id}/audit-logs', guild_id=guild_id)
return self.request(r, params=params)
def get_widget(self, guild_id: Snowflake) -> Response[widget.Widget]:
return self.request(Route('GET', '/guilds/{guild_id}/widget.json', guild_id=guild_id))
def edit_widget(
self, guild_id: Snowflake, payload: widget.EditWidgetSettings, reason: Optional[str] = None
) -> Response[widget.WidgetSettings]:
return self.request(Route('PATCH', '/guilds/{guild_id}/widget', guild_id=guild_id), json=payload, reason=reason)
def edit_incident_actions(self, guild_id: Snowflake, payload: guild.IncidentData) -> Response[guild.IncidentData]:
return self.request(Route('PUT', '/guilds/{guild_id}/incident-actions', guild_id=guild_id), json=payload)
# Invite management
def create_invite(
self,
channel_id: Snowflake,
*,
reason: Optional[str] = None,
max_age: int = 0,
max_uses: int = 0,
temporary: bool = False,
unique: bool = True,
target_type: Optional[invite.InviteTargetType] = None,
target_user_id: Optional[Snowflake] = None,
target_application_id: Optional[Snowflake] = None,
flags: Optional[int] = None,
) -> Response[invite.Invite]:
r = Route('POST', '/channels/{channel_id}/invites', channel_id=channel_id)
payload = {
'max_age': max_age,
'max_uses': max_uses,
'temporary': temporary,
'unique': unique,
}
if target_type:
payload['target_type'] = target_type
if target_user_id:
payload['target_user_id'] = target_user_id
if target_application_id:
payload['target_application_id'] = str(target_application_id)
if flags:
payload['flags'] = flags
return self.request(r, reason=reason, json=payload)
def get_invite(
self,
invite_id: str,
*,
with_counts: bool = True,
guild_scheduled_event_id: Optional[Snowflake] = None,
) -> Response[invite.Invite]:
params: Dict[str, Any] = {
'with_counts': int(with_counts),
}
if guild_scheduled_event_id:
params['guild_scheduled_event_id'] = guild_scheduled_event_id
return self.request(Route('GET', '/invites/{invite_id}', invite_id=invite_id), params=params)
def invites_from(self, guild_id: Snowflake) -> Response[List[invite.Invite]]:
return self.request(Route('GET', '/guilds/{guild_id}/invites', guild_id=guild_id))
def invites_from_channel(self, channel_id: Snowflake) -> Response[List[invite.Invite]]:
return self.request(Route('GET', '/channels/{channel_id}/invites', channel_id=channel_id))
def delete_invite(self, invite_id: str, *, reason: Optional[str] = None) -> Response[invite.Invite]:
return self.request(Route('DELETE', '/invites/{invite_id}', invite_id=invite_id), reason=reason)
# Role management
def get_roles(self, guild_id: Snowflake) -> Response[List[role.Role]]:
return self.request(Route('GET', '/guilds/{guild_id}/roles', guild_id=guild_id))
def get_role(self, guild_id: Snowflake, role_id: Snowflake) -> Response[role.Role]:
return self.request(Route('GET', '/guilds/{guild_id}/roles/{role_id}', guild_id=guild_id, role_id=role_id))
def edit_role(
self, guild_id: Snowflake, role_id: Snowflake, *, reason: Optional[str] = None, **fields: Any
) -> Response[role.Role]:
r = Route('PATCH', '/guilds/{guild_id}/roles/{role_id}', guild_id=guild_id, role_id=role_id)
valid_keys = ('name', 'permissions', 'color', 'hoist', 'icon', 'unicode_emoji', 'mentionable', 'colors')
payload = {k: v for k, v in fields.items() if k in valid_keys}
return self.request(r, json=payload, reason=reason)
def delete_role(self, guild_id: Snowflake, role_id: Snowflake, *, reason: Optional[str] = None) -> Response[None]:
r = Route('DELETE', '/guilds/{guild_id}/roles/{role_id}', guild_id=guild_id, role_id=role_id)
return self.request(r, reason=reason)
def replace_roles(
self,
user_id: Snowflake,
guild_id: Snowflake,
role_ids: List[int],
*,
reason: Optional[str] = None,
) -> Response[member.MemberWithUser]:
return self.edit_member(guild_id=guild_id, user_id=user_id, roles=role_ids, reason=reason)
def create_role(self, guild_id: Snowflake, *, reason: Optional[str] = None, **fields: Any) -> Response[role.Role]:
r = Route('POST', '/guilds/{guild_id}/roles', guild_id=guild_id)
return self.request(r, json=fields, reason=reason)
def move_role_position(
self,
guild_id: Snowflake,
positions: List[guild.RolePositionUpdate],
*,
reason: Optional[str] = None,
) -> Response[List[role.Role]]:
r = Route('PATCH', '/guilds/{guild_id}/roles', guild_id=guild_id)
return self.request(r, json=positions, reason=reason)
def add_role(
self, guild_id: Snowflake, user_id: Snowflake, role_id: Snowflake, *, reason: Optional[str] = None
) -> Response[None]:
r = Route(
'PUT',
'/guilds/{guild_id}/members/{user_id}/roles/{role_id}',
guild_id=guild_id,
user_id=user_id,
role_id=role_id,
)
return self.request(r, reason=reason)
def remove_role(
self, guild_id: Snowflake, user_id: Snowflake, role_id: Snowflake, *, reason: Optional[str] = None
) -> Response[None]:
r = Route(
'DELETE',
'/guilds/{guild_id}/members/{user_id}/roles/{role_id}',
guild_id=guild_id,
user_id=user_id,
role_id=role_id,
)
return self.request(r, reason=reason)
def edit_channel_permissions(
self,
channel_id: Snowflake,
target: Snowflake,
allow: str,
deny: str,
type: channel.OverwriteType,
*,
reason: Optional[str] = None,
) -> Response[None]:
payload = {'id': target, 'allow': allow, 'deny': deny, 'type': type}
r = Route('PUT', '/channels/{channel_id}/permissions/{target}', channel_id=channel_id, target=target)
return self.request(r, json=payload, reason=reason)
def delete_channel_permissions(
self, channel_id: Snowflake, target: Snowflake, *, reason: Optional[str] = None
) -> Response[None]:
r = Route('DELETE', '/channels/{channel_id}/permissions/{target}', channel_id=channel_id, target=target)
return self.request(r, reason=reason)
# Voice management
def move_member(
self,
user_id: Snowflake,
guild_id: Snowflake,
channel_id: Snowflake,
*,
reason: Optional[str] = None,
) -> Response[member.MemberWithUser]:
return self.edit_member(guild_id=guild_id, user_id=user_id, channel_id=channel_id, reason=reason)
# Stage instance management
def get_stage_instance(self, channel_id: Snowflake) -> Response[channel.StageInstance]:
return self.request(Route('GET', '/stage-instances/{channel_id}', channel_id=channel_id))
def create_stage_instance(self, *, reason: Optional[str], **payload: Any) -> Response[channel.StageInstance]:
valid_keys = (
'channel_id',
'topic',
'privacy_level',
'send_start_notification',
'guild_scheduled_event_id',
)
payload = {k: v for k, v in payload.items() if k in valid_keys}
return self.request(Route('POST', '/stage-instances'), json=payload, reason=reason)
def edit_stage_instance(self, channel_id: Snowflake, *, reason: Optional[str] = None, **payload: Any) -> Response[None]:
valid_keys = (
'topic',
'privacy_level',
)
payload = {k: v for k, v in payload.items() if k in valid_keys}
return self.request(
Route('PATCH', '/stage-instances/{channel_id}', channel_id=channel_id), json=payload, reason=reason
)
def delete_stage_instance(self, channel_id: Snowflake, *, reason: Optional[str] = None) -> Response[None]:
return self.request(Route('DELETE', '/stage-instances/{channel_id}', channel_id=channel_id), reason=reason)
# Guild scheduled event management
@overload
def get_scheduled_events(
self, guild_id: Snowflake, with_user_count: Literal[True]
) -> Response[List[scheduled_event.GuildScheduledEventWithUserCount]]: ...
@overload
def get_scheduled_events(
self, guild_id: Snowflake, with_user_count: Literal[False]
) -> Response[List[scheduled_event.GuildScheduledEvent]]: ...
@overload
def get_scheduled_events(
self, guild_id: Snowflake, with_user_count: bool
) -> Union[
Response[List[scheduled_event.GuildScheduledEventWithUserCount]], Response[List[scheduled_event.GuildScheduledEvent]]
]: ...
def get_scheduled_events(self, guild_id: Snowflake, with_user_count: bool) -> Response[Any]:
params = {'with_user_count': int(with_user_count)}
return self.request(Route('GET', '/guilds/{guild_id}/scheduled-events', guild_id=guild_id), params=params)
def create_guild_scheduled_event(
self, guild_id: Snowflake, *, reason: Optional[str] = None, **payload: Any
) -> Response[scheduled_event.GuildScheduledEvent]:
valid_keys = (
'channel_id',
'entity_metadata',
'name',
'privacy_level',
'scheduled_start_time',
'scheduled_end_time',
'description',
'entity_type',
'image',
)
payload = {k: v for k, v in payload.items() if k in valid_keys}
return self.request(
Route('POST', '/guilds/{guild_id}/scheduled-events', guild_id=guild_id), json=payload, reason=reason
)
@overload
def get_scheduled_event(
self, guild_id: Snowflake, guild_scheduled_event_id: Snowflake, with_user_count: Literal[True]
) -> Response[scheduled_event.GuildScheduledEventWithUserCount]: ...
@overload
def get_scheduled_event(
self, guild_id: Snowflake, guild_scheduled_event_id: Snowflake, with_user_count: Literal[False]
) -> Response[scheduled_event.GuildScheduledEvent]: ...
@overload
def get_scheduled_event(
self, guild_id: Snowflake, guild_scheduled_event_id: Snowflake, with_user_count: bool
) -> Union[
Response[scheduled_event.GuildScheduledEventWithUserCount], Response[scheduled_event.GuildScheduledEvent]
]: ...
def get_scheduled_event(
self, guild_id: Snowflake, guild_scheduled_event_id: Snowflake, with_user_count: bool
) -> Response[Any]:
params = {'with_user_count': int(with_user_count)}
return self.request(
Route(
'GET',
'/guilds/{guild_id}/scheduled-events/{guild_scheduled_event_id}',
guild_id=guild_id,
guild_scheduled_event_id=guild_scheduled_event_id,
),
params=params,
)
def edit_scheduled_event(
self, guild_id: Snowflake, guild_scheduled_event_id: Snowflake, *, reason: Optional[str] = None, **payload: Any
) -> Response[scheduled_event.GuildScheduledEvent]:
valid_keys = (
'channel_id',
'entity_metadata',
'name',
'privacy_level',
'scheduled_start_time',
'scheduled_end_time',
'status',
'description',
'entity_type',
'image',
)
payload = {k: v for k, v in payload.items() if k in valid_keys}
return self.request(
Route(
'PATCH',
'/guilds/{guild_id}/scheduled-events/{guild_scheduled_event_id}',
guild_id=guild_id,
guild_scheduled_event_id=guild_scheduled_event_id,
),
json=payload,
reason=reason,
)
def delete_scheduled_event(
self,
guild_id: Snowflake,
guild_scheduled_event_id: Snowflake,
*,
reason: Optional[str] = None,
) -> Response[None]:
return self.request(
Route(
'DELETE',
'/guilds/{guild_id}/scheduled-events/{guild_scheduled_event_id}',
guild_id=guild_id,
guild_scheduled_event_id=guild_scheduled_event_id,
),
reason=reason,
)
@overload
def get_scheduled_event_users(
self,
guild_id: Snowflake,
guild_scheduled_event_id: Snowflake,
limit: int,
with_member: Literal[True],
before: Optional[Snowflake] = ...,
after: Optional[Snowflake] = ...,
) -> Response[scheduled_event.ScheduledEventUsersWithMember]: ...
@overload
def get_scheduled_event_users(
self,
guild_id: Snowflake,
guild_scheduled_event_id: Snowflake,
limit: int,
with_member: Literal[False],
before: Optional[Snowflake] = ...,
after: Optional[Snowflake] = ...,
) -> Response[scheduled_event.ScheduledEventUsers]: ...
@overload
def get_scheduled_event_users(
self,
guild_id: Snowflake,
guild_scheduled_event_id: Snowflake,
limit: int,
with_member: bool,
before: Optional[Snowflake] = ...,
after: Optional[Snowflake] = ...,
) -> Union[Response[scheduled_event.ScheduledEventUsersWithMember], Response[scheduled_event.ScheduledEventUsers]]: ...
def get_scheduled_event_users(
self,
guild_id: Snowflake,
guild_scheduled_event_id: Snowflake,
limit: int,
with_member: bool,
before: Optional[Snowflake] = None,
after: Optional[Snowflake] = None,
) -> Response[Any]:
params: Dict[str, Any] = {
'limit': limit,
'with_member': int(with_member),
}
if before is not None:
params['before'] = before
if after is not None:
params['after'] = after
return self.request(
Route(
'GET',
'/guilds/{guild_id}/scheduled-events/{guild_scheduled_event_id}/users',
guild_id=guild_id,
guild_scheduled_event_id=guild_scheduled_event_id,
),
params=params,
)
# Application commands (global)
def get_global_commands(self, application_id: Snowflake) -> Response[List[command.ApplicationCommand]]:
return self.request(Route('GET', '/applications/{application_id}/commands', application_id=application_id))
def get_global_command(self, application_id: Snowflake, command_id: Snowflake) -> Response[command.ApplicationCommand]:
r = Route(
'GET',
'/applications/{application_id}/commands/{command_id}',
application_id=application_id,
command_id=command_id,
)
return self.request(r)
def upsert_global_command(
self, application_id: Snowflake, payload: command.ApplicationCommand
) -> Response[command.ApplicationCommand]:
r = Route('POST', '/applications/{application_id}/commands', application_id=application_id)
return self.request(r, json=payload)
def edit_global_command(
self,
application_id: Snowflake,
command_id: Snowflake,
payload: Dict[str, Any],
) -> Response[command.ApplicationCommand]:
valid_keys = (
'name',
'description',
'options',
)
payload = {k: v for k, v in payload.items() if k in valid_keys}
r = Route(
'PATCH',
'/applications/{application_id}/commands/{command_id}',
application_id=application_id,
command_id=command_id,
)
return self.request(r, json=payload)
def delete_global_command(self, application_id: Snowflake, command_id: Snowflake) -> Response[None]:
r = Route(
'DELETE',
'/applications/{application_id}/commands/{command_id}',
application_id=application_id,
command_id=command_id,
)
return self.request(r)
def bulk_upsert_global_commands(
self, application_id: Snowflake, payload: List[Dict[str, Any]]
) -> Response[List[command.ApplicationCommand]]:
r = Route('PUT', '/applications/{application_id}/commands', application_id=application_id)
return self.request(r, json=payload)
# Application commands (guild)
def get_guild_commands(
self, application_id: Snowflake, guild_id: Snowflake
) -> Response[List[command.ApplicationCommand]]:
r = Route(
'GET',
'/applications/{application_id}/guilds/{guild_id}/commands',
application_id=application_id,
guild_id=guild_id,
)
return self.request(r)
def get_guild_command(
self,
application_id: Snowflake,
guild_id: Snowflake,
command_id: Snowflake,
) -> Response[command.ApplicationCommand]:
r = Route(
'GET',
'/applications/{application_id}/guilds/{guild_id}/commands/{command_id}',
application_id=application_id,
guild_id=guild_id,
command_id=command_id,
)
return self.request(r)
def upsert_guild_command(
self,
application_id: Snowflake,
guild_id: Snowflake,
payload: Dict[str, Any],
) -> Response[command.ApplicationCommand]:
r = Route(
'POST',
'/applications/{application_id}/guilds/{guild_id}/commands',
application_id=application_id,
guild_id=guild_id,
)
return self.request(r, json=payload)
def edit_guild_command(
self,
application_id: Snowflake,
guild_id: Snowflake,
command_id: Snowflake,
payload: Dict[str, Any],
) -> Response[command.ApplicationCommand]:
valid_keys = (
'name',
'description',
'options',
)
payload = {k: v for k, v in payload.items() if k in valid_keys}
r = Route(
'PATCH',
'/applications/{application_id}/guilds/{guild_id}/commands/{command_id}',
application_id=application_id,
guild_id=guild_id,
command_id=command_id,
)
return self.request(r, json=payload)
def delete_guild_command(
self,
application_id: Snowflake,
guild_id: Snowflake,
command_id: Snowflake,
) -> Response[None]:
r = Route(
'DELETE',
'/applications/{application_id}/guilds/{guild_id}/commands/{command_id}',
application_id=application_id,
guild_id=guild_id,
command_id=command_id,
)
return self.request(r)
def bulk_upsert_guild_commands(
self,
application_id: Snowflake,
guild_id: Snowflake,
payload: List[Dict[str, Any]],
) -> Response[List[command.ApplicationCommand]]:
r = Route(
'PUT',
'/applications/{application_id}/guilds/{guild_id}/commands',
application_id=application_id,
guild_id=guild_id,
)
return self.request(r, json=payload)
def get_guild_application_command_permissions(
self,
application_id: Snowflake,
guild_id: Snowflake,
) -> Response[List[command.GuildApplicationCommandPermissions]]:
r = Route(
'GET',
'/applications/{application_id}/guilds/{guild_id}/commands/permissions',
application_id=application_id,
guild_id=guild_id,
)
return self.request(r)
def get_application_command_permissions(
self,
application_id: Snowflake,
guild_id: Snowflake,
command_id: Snowflake,
) -> Response[command.GuildApplicationCommandPermissions]:
r = Route(
'GET',
'/applications/{application_id}/guilds/{guild_id}/commands/{command_id}/permissions',
application_id=application_id,
guild_id=guild_id,
command_id=command_id,
)
return self.request(r)
def edit_application_command_permissions(
self,
application_id: Snowflake,
guild_id: Snowflake,
command_id: Snowflake,
payload: Dict[str, Any],
) -> Response[None]:
r = Route(
'PUT',
'/applications/{application_id}/guilds/{guild_id}/commands/{command_id}/permissions',
application_id=application_id,
guild_id=guild_id,
command_id=command_id,
)
return self.request(r, json=payload)
def get_auto_moderation_rules(self, guild_id: Snowflake) -> Response[List[automod.AutoModerationRule]]:
return self.request(Route('GET', '/guilds/{guild_id}/auto-moderation/rules', guild_id=guild_id))
def get_auto_moderation_rule(self, guild_id: Snowflake, rule_id: Snowflake) -> Response[automod.AutoModerationRule]:
return self.request(
Route('GET', '/guilds/{guild_id}/auto-moderation/rules/{rule_id}', guild_id=guild_id, rule_id=rule_id)
)
def create_auto_moderation_rule(
self, guild_id: Snowflake, *, reason: Optional[str], **payload: Any
) -> Response[automod.AutoModerationRule]:
valid_keys = (
'name',
'event_type',
'trigger_type',
'trigger_metadata',
'actions',
'enabled',
'exempt_roles',
'exempt_channels',
)
payload = {k: v for k, v in payload.items() if k in valid_keys and v is not None}
return self.request(
Route('POST', '/guilds/{guild_id}/auto-moderation/rules', guild_id=guild_id), json=payload, reason=reason
)
def edit_auto_moderation_rule(
self, guild_id: Snowflake, rule_id: Snowflake, *, reason: Optional[str], **payload: Any
) -> Response[automod.AutoModerationRule]:
valid_keys = (
'name',
'event_type',
'trigger_metadata',
'actions',
'enabled',
'exempt_roles',
'exempt_channels',
)
payload = {k: v for k, v in payload.items() if k in valid_keys and v is not None}
return self.request(
Route('PATCH', '/guilds/{guild_id}/auto-moderation/rules/{rule_id}', guild_id=guild_id, rule_id=rule_id),
json=payload,
reason=reason,
)
def delete_auto_moderation_rule(
self, guild_id: Snowflake, rule_id: Snowflake, *, reason: Optional[str]
) -> Response[None]:
return self.request(
Route('DELETE', '/guilds/{guild_id}/auto-moderation/rules/{rule_id}', guild_id=guild_id, rule_id=rule_id),
reason=reason,
)
# SKU
def get_skus(self, application_id: Snowflake) -> Response[List[sku.SKU]]:
return self.request(Route('GET', '/applications/{application_id}/skus', application_id=application_id))
def get_entitlements(
self,
application_id: Snowflake,
user_id: Optional[Snowflake] = None,
sku_ids: Optional[SnowflakeList] = None,
before: Optional[Snowflake] = None,
after: Optional[Snowflake] = None,
limit: Optional[int] = None,
guild_id: Optional[Snowflake] = None,
exclude_ended: Optional[bool] = None,
exclude_deleted: Optional[bool] = None,
) -> Response[List[sku.Entitlement]]:
params: Dict[str, Any] = {}
if user_id is not None:
params['user_id'] = user_id
if sku_ids is not None:
params['sku_ids'] = ','.join(map(str, sku_ids))
if before is not None:
params['before'] = before
if after is not None:
params['after'] = after
if limit is not None:
params['limit'] = limit
if guild_id is not None:
params['guild_id'] = guild_id
if exclude_ended is not None:
params['exclude_ended'] = int(exclude_ended)
if exclude_deleted is not None:
params['exclude_deleted'] = int(exclude_deleted)
return self.request(
Route('GET', '/applications/{application_id}/entitlements', application_id=application_id), params=params
)
def get_entitlement(self, application_id: Snowflake, entitlement_id: Snowflake) -> Response[sku.Entitlement]:
return self.request(
Route(
'GET',
'/applications/{application_id}/entitlements/{entitlement_id}',
application_id=application_id,
entitlement_id=entitlement_id,
),
)
def consume_entitlement(self, application_id: Snowflake, entitlement_id: Snowflake) -> Response[None]:
return self.request(
Route(
'POST',
'/applications/{application_id}/entitlements/{entitlement_id}/consume',
application_id=application_id,
entitlement_id=entitlement_id,
),
)
def create_entitlement(
self, application_id: Snowflake, sku_id: Snowflake, owner_id: Snowflake, owner_type: sku.EntitlementOwnerType
) -> Response[sku.Entitlement]:
payload = {
'sku_id': sku_id,
'owner_id': owner_id,
'owner_type': owner_type,
}
return self.request(
Route(
'POST',
'/applications/{application_id}/entitlements',
application_id=application_id,
),
json=payload,
)
def delete_entitlement(self, application_id: Snowflake, entitlement_id: Snowflake) -> Response[None]:
return self.request(
Route(
'DELETE',
'/applications/{application_id}/entitlements/{entitlement_id}',
application_id=application_id,
entitlement_id=entitlement_id,
),
)
# Guild Onboarding
def get_guild_onboarding(self, guild_id: Snowflake) -> Response[onboarding.Onboarding]:
return self.request(Route('GET', '/guilds/{guild_id}/onboarding', guild_id=guild_id))
def edit_guild_onboarding(
self,
guild_id: Snowflake,
*,
prompts: Optional[List[onboarding.Prompt]] = None,
default_channel_ids: Optional[List[Snowflake]] = None,
enabled: Optional[bool] = None,
mode: Optional[onboarding.OnboardingMode] = None,
reason: Optional[str],
) -> Response[onboarding.Onboarding]:
payload = {}
if prompts is not None:
payload['prompts'] = prompts
if default_channel_ids is not None:
payload['default_channel_ids'] = default_channel_ids
if enabled is not None:
payload['enabled'] = enabled
if mode is not None:
payload['mode'] = mode
return self.request(
Route('PUT', f'/guilds/{guild_id}/onboarding', guild_id=guild_id),
json=payload,
reason=reason,
)
# Soundboard
def get_soundboard_default_sounds(self) -> Response[List[soundboard.SoundboardDefaultSound]]:
return self.request(Route('GET', '/soundboard-default-sounds'))
def get_soundboard_sound(self, guild_id: Snowflake, sound_id: Snowflake) -> Response[soundboard.SoundboardSound]:
return self.request(
Route('GET', '/guilds/{guild_id}/soundboard-sounds/{sound_id}', guild_id=guild_id, sound_id=sound_id)
)
def get_soundboard_sounds(self, guild_id: Snowflake) -> Response[Dict[str, List[soundboard.SoundboardSound]]]:
return self.request(Route('GET', '/guilds/{guild_id}/soundboard-sounds', guild_id=guild_id))
def create_soundboard_sound(
self, guild_id: Snowflake, *, reason: Optional[str], **payload: Any
) -> Response[soundboard.SoundboardSound]:
valid_keys = (
'name',
'sound',
'volume',
'emoji_id',
'emoji_name',
)
payload = {k: v for k, v in payload.items() if k in valid_keys and v is not None}
return self.request(
Route('POST', '/guilds/{guild_id}/soundboard-sounds', guild_id=guild_id), json=payload, reason=reason
)
def edit_soundboard_sound(
self, guild_id: Snowflake, sound_id: Snowflake, *, reason: Optional[str], **payload: Any
) -> Response[soundboard.SoundboardSound]:
valid_keys = (
'name',
'volume',
'emoji_id',
'emoji_name',
)
payload = {k: v for k, v in payload.items() if k in valid_keys}
return self.request(
Route(
'PATCH',
'/guilds/{guild_id}/soundboard-sounds/{sound_id}',
guild_id=guild_id,
sound_id=sound_id,
),
json=payload,
reason=reason,
)
def delete_soundboard_sound(self, guild_id: Snowflake, sound_id: Snowflake, *, reason: Optional[str]) -> Response[None]:
return self.request(
Route(
'DELETE',
'/guilds/{guild_id}/soundboard-sounds/{sound_id}',
guild_id=guild_id,
sound_id=sound_id,
),
reason=reason,
)
def send_soundboard_sound(self, channel_id: Snowflake, **payload: Any) -> Response[None]:
valid_keys = ('sound_id', 'source_guild_id')
payload = {k: v for k, v in payload.items() if k in valid_keys}
return self.request(
(Route('POST', '/channels/{channel_id}/send-soundboard-sound', channel_id=channel_id)), json=payload
)
# Application
def application_info(self) -> Response[appinfo.AppInfo]:
return self.request(Route('GET', '/oauth2/applications/@me'))
def edit_application_info(self, *, reason: Optional[str], payload: Any) -> Response[appinfo.AppInfo]:
valid_keys = (
'custom_install_url',
'description',
'role_connections_verification_url',
'install_params',
'flags',
'icon',
'cover_image',
'interactions_endpoint_url ',
'tags',
'integration_types_config',
)
payload = {k: v for k, v in payload.items() if k in valid_keys}
return self.request(Route('PATCH', '/applications/@me'), json=payload, reason=reason)
def get_application_emojis(self, application_id: Snowflake) -> Response[appinfo.ListAppEmojis]:
return self.request(Route('GET', '/applications/{application_id}/emojis', application_id=application_id))
def get_application_emoji(self, application_id: Snowflake, emoji_id: Snowflake) -> Response[emoji.Emoji]:
return self.request(
Route(
'GET', '/applications/{application_id}/emojis/{emoji_id}', application_id=application_id, emoji_id=emoji_id
)
)
def create_application_emoji(
self,
application_id: Snowflake,
name: str,
image: str,
) -> Response[emoji.Emoji]:
payload = {
'name': name,
'image': image,
}
return self.request(
Route('POST', '/applications/{application_id}/emojis', application_id=application_id), json=payload
)
def edit_application_emoji(
self,
application_id: Snowflake,
emoji_id: Snowflake,
*,
payload: Dict[str, Any],
) -> Response[emoji.Emoji]:
r = Route(
'PATCH', '/applications/{application_id}/emojis/{emoji_id}', application_id=application_id, emoji_id=emoji_id
)
return self.request(r, json=payload)
def delete_application_emoji(
self,
application_id: Snowflake,
emoji_id: Snowflake,
) -> Response[None]:
return self.request(
Route(
'DELETE',
'/applications/{application_id}/emojis/{emoji_id}',
application_id=application_id,
emoji_id=emoji_id,
)
)
# Poll
def get_poll_answer_voters(
self,
channel_id: Snowflake,
message_id: Snowflake,
answer_id: Snowflake,
after: Optional[Snowflake] = None,
limit: Optional[int] = None,
) -> Response[poll.PollAnswerVoters]:
params = {}
if after:
params['after'] = int(after)
if limit is not None:
params['limit'] = limit
return self.request(
Route(
'GET',
'/channels/{channel_id}/polls/{message_id}/answers/{answer_id}',
channel_id=channel_id,
message_id=message_id,
answer_id=answer_id,
),
params=params,
)
def end_poll(self, channel_id: Snowflake, message_id: Snowflake) -> Response[message.Message]:
return self.request(
Route(
'POST',
'/channels/{channel_id}/polls/{message_id}/expire',
channel_id=channel_id,
message_id=message_id,
)
)
# Subscriptions
def list_sku_subscriptions(
self,
sku_id: Snowflake,
before: Optional[Snowflake] = None,
after: Optional[Snowflake] = None,
limit: Optional[int] = None,
user_id: Optional[Snowflake] = None,
) -> Response[List[subscription.Subscription]]:
params = {}
if before is not None:
params['before'] = before
if after is not None:
params['after'] = after
if limit is not None:
params['limit'] = limit
if user_id is not None:
params['user_id'] = user_id
return self.request(
Route(
'GET',
'/skus/{sku_id}/subscriptions',
sku_id=sku_id,
),
params=params,
)
def get_sku_subscription(self, sku_id: Snowflake, subscription_id: Snowflake) -> Response[subscription.Subscription]:
return self.request(
Route(
'GET',
'/skus/{sku_id}/subscriptions/{subscription_id}',
sku_id=sku_id,
subscription_id=subscription_id,
)
)
# Misc
async def get_bot_gateway(self) -> Tuple[int, str, SessionStartLimit]:
try:
data = await self.request(Route('GET', '/gateway/bot'))
except HTTPException as exc:
raise GatewayNotFound() from exc
return data['shards'], data['url'], data['session_start_limit']
def get_user(self, user_id: Snowflake) -> Response[user.User]:
return self.request(Route('GET', '/users/{user_id}', user_id=user_id))