You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

2277 lines
91 KiB

"""
The MIT License (MIT)
Copyright (c) 2015-present Rapptz
Permission is hereby granted, free of charge, to any person obtaining a
copy of this software and associated documentation files (the "Software"),
to deal in the Software without restriction, including without limitation
the rights to use, copy, modify, merge, publish, distribute, sublicense,
and/or sell copies of the Software, and to permit persons to whom the
Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.
"""
from __future__ import annotations
import asyncio
from collections import deque, OrderedDict
import copy
import datetime
import logging
from typing import (
Dict,
Optional,
TYPE_CHECKING,
Union,
Callable,
Any,
List,
TypeVar,
Coroutine,
Tuple,
Deque,
Literal,
overload,
)
import weakref
import inspect
import time
import random
from math import ceil
from .errors import NotFound
from .guild import CommandCounts, Guild
from .activity import BaseActivity
from .user import User, ClientUser
from .emoji import Emoji
from .mentions import AllowedMentions
from .partial_emoji import PartialEmoji
from .message import Message
from .channel import *
from .channel import _channel_factory, _private_channel_factory
from .raw_models import *
from .member import Member
from .relationship import Relationship
from .role import Role
from .enums import ChannelType, RequiredActionType, Status, try_enum, UnavailableGuildType
from . import utils
from .flags import MemberCacheFlags
from .invite import Invite
from .integrations import _integration_factory
from .scheduled_event import ScheduledEvent
from .stage_instance import StageInstance
from .threads import Thread, ThreadMember
from .sticker import GuildSticker
from .settings import UserSettings, GuildSettings
from .tracking import Tracking
from .interactions import Interaction
from .permissions import Permissions, PermissionOverwrite
from .member import _ClientStatus
from .modal import Modal
if TYPE_CHECKING:
from .abc import PrivateChannel, Snowflake as abcSnowflake
from .message import MessageableChannel
from .guild import GuildChannel, VocalGuildChannel
from .http import HTTPClient
from .voice_client import VoiceProtocol
from .client import Client
from .gateway import DiscordWebSocket
from .calls import Call
from .member import VoiceState
from .types.snowflake import Snowflake
from .types.activity import Activity as ActivityPayload
from .types.channel import DMChannel as DMChannelPayload
from .types.user import User as UserPayload, PartialUser as PartialUserPayload
from .types.emoji import Emoji as EmojiPayload, PartialEmoji as PartialEmojiPayload
from .types.sticker import GuildSticker as GuildStickerPayload
from .types.guild import Guild as GuildPayload
from .types.message import Message as MessagePayload, PartialMessage as PartialMessagePayload
from .types import gateway as gw
from .types.voice import GuildVoiceState
T = TypeVar('T')
Channel = Union[GuildChannel, VocalGuildChannel, PrivateChannel, PartialMessageable, ForumChannel]
MISSING = utils.MISSING
_log = logging.getLogger(__name__)
class ChunkRequest:
def __init__(
self,
guild_id: int,
loop: asyncio.AbstractEventLoop,
resolver: Callable[[int], Any],
*,
cache: bool = True,
) -> None:
self.guild_id: int = guild_id
self.resolver: Callable[[int], Any] = resolver
self.loop: asyncio.AbstractEventLoop = loop
self.cache: bool = cache
self.nonce: str = str(utils.time_snowflake(utils.utcnow()))
self.buffer: List[Member] = []
self.waiters: List[asyncio.Future[List[Member]]] = []
def add_members(self, members: List[Member]) -> None:
self.buffer.extend(members)
if self.cache:
guild = self.resolver(self.guild_id)
if guild is None:
return
for member in members:
guild._add_member(member)
async def wait(self) -> List[Member]:
future = self.loop.create_future()
self.waiters.append(future)
try:
return await future
finally:
self.waiters.remove(future)
def get_future(self) -> asyncio.Future[List[Member]]:
future = self.loop.create_future()
self.waiters.append(future)
return future
def done(self) -> None:
for future in self.waiters:
if not future.done():
future.set_result(self.buffer)
class MemberSidebar:
def __init__(
self,
guild: Guild,
channels: List[abcSnowflake],
*,
chunk: bool,
delay: Union[int, float],
cache: bool,
loop: asyncio.AbstractEventLoop,
) -> None:
self.guild = guild
self.cache = cache
self.chunk = chunk
self.delay = delay
self.loop = loop
self.channels = [str(channel.id) for channel in (channels or self.get_channels(1 if chunk else 5))]
self.ranges = self.get_ranges()
self.subscribing: bool = False
self.buffer: Optional[List[Member]] = []
self.waiters: List[asyncio.Future[Optional[List[Member]]]] = []
def __bool__(self) -> bool:
return self.subscribing
@property
def limit(self) -> int:
guild = self.guild
return guild._presence_count if guild._offline_members_hidden else guild._member_count # type: ignore
@property
def state(self) -> ConnectionState:
return self.guild._state
@property
def ws(self):
return self.guild._state.ws
@property
def safe(self):
return self.guild._member_count >= 75000
@staticmethod
def amalgamate(original: Tuple[int, int], value: Tuple[int, int]) -> Tuple[int, int]:
return original[0], value[1] - 99
def get_ranges(self) -> List[Tuple[int, int]]:
chunk = 100
end = 99
amount = self.limit
if amount is None:
raise RuntimeError('cannot get ranges for a guild with no member/presence count')
ceiling = ceil(amount / chunk) * chunk
ranges = []
for i in range(0, int(ceiling / chunk)):
min = i * chunk
max = min + end
ranges.append((min, max))
return ranges
def get_current_ranges(self) -> List[Tuple[int, int]]:
ranges = self.ranges
ret = []
for _ in range(3):
if self.safe:
try:
ret.append(ranges.pop(0))
except IndexError:
break
else:
try:
current = ranges.pop(0)
except IndexError:
break
for _ in range(3):
try:
current = self.amalgamate(current, ranges.pop(0))
except IndexError:
break
ret.append(current)
return ret
def get_channels(self, amount: int) -> List[abcSnowflake]:
guild = self.guild
ret = set()
channels = [
channel
for channel in self.guild.channels
if channel.type != ChannelType.stage_voice and channel.permissions_for(guild.me).read_messages
]
if guild.rules_channel is not None:
channels.insert(0, guild.rules_channel)
while len(ret) < amount and channels:
channel = channels.pop()
for role in guild.roles:
if not channel.permissions_for(role).read_messages:
break
else:
for ow in channel._overwrites:
if ow.is_member():
allow = Permissions(ow.allow)
deny = Permissions(ow.deny)
overwrite = PermissionOverwrite.from_pair(allow, deny)
if not overwrite.read_messages:
break
ret.add(channel)
return list(ret)
def add_members(self, members: List[Member]) -> None:
if self.buffer is None:
return
self.buffer.extend(members)
if self.cache:
guild = self.guild
for member in members:
guild._add_member(member)
async def wait(self) -> Optional[List[Member]]:
future = self.loop.create_future()
self.waiters.append(future)
try:
return await future
finally:
self.waiters.remove(future)
def get_future(self) -> asyncio.Future[Optional[List[Member]]]:
future = self.loop.create_future()
self.waiters.append(future)
return future
def done(self) -> None:
for future in self.waiters:
if not future.done():
future.set_result(self.buffer)
try:
del self.state._scrape_requests[self.guild.id]
except KeyError:
pass
def start(self):
self.loop.create_task(self.wrapper())
async def wrapper(self):
try:
await self.scrape()
except RuntimeError as exc:
_log.warning('Member list scraping failed for %s (%s).', self.guild.id, exc)
self.buffer = None
except asyncio.CancelledError:
pass
finally:
self.done()
async def scrape(self):
self.subscribing = True
delay = self.delay
channels = self.channels
guild = self.guild
ws = self.ws
while self.subscribing:
requests = {}
for channel in channels:
ranges = self.get_current_ranges()
if not ranges:
self.subscribing = False
break
requests[channel] = ranges
if not self.subscribing and not requests:
break
if not requests:
raise RuntimeError('failed to choose channels or ranges')
def predicate(data):
return int(data['guild_id']) == guild.id and any(op['op'] == 'SYNC' for op in data['ops'])
await ws.request_lazy_guild(guild.id, channels=requests)
try:
await asyncio.wait_for(ws.wait_for('GUILD_MEMBER_LIST_UPDATE', predicate), timeout=15)
except asyncio.TimeoutError:
r = tuple(requests.values())[-1][-1]
if self.limit in range(r[0], r[1]) or self.limit < r[1]:
self.subscribing = False
break
else:
raise RuntimeError('timeout: no response from gateway')
await asyncio.sleep(delay)
r = tuple(requests.values())[-1][-1]
if self.limit in range(r[0], r[1]) or self.limit < r[1]:
self.subscribing = False
break
if not self.chunk: # Freeze cache
await ws.request_lazy_guild(guild.id, channels={})
else:
self.guild._chunked = True
async def logging_coroutine(coroutine: Coroutine[Any, Any, T], *, info: str) -> Optional[T]:
try:
await coroutine
except Exception:
_log.exception('Exception occurred during %s.', info)
class ConnectionState:
def __init__(
self,
*,
dispatch: Callable[..., Any],
handlers: Dict[str, Callable[..., Any]],
hooks: Dict[str, Callable[..., Coroutine[Any, Any, Any]]],
http: HTTPClient,
client: Client,
**options: Any,
) -> None:
# Set later, after Client.login
self.loop: asyncio.AbstractEventLoop = utils.MISSING
self.http: HTTPClient = http
self.client = client
self.max_messages: Optional[int] = options.get('max_messages', 1000)
if self.max_messages is not None and self.max_messages <= 0:
self.max_messages = 1000
self.dispatch: Callable[..., Any] = dispatch
self.handlers: Dict[str, Callable[..., Any]] = handlers
self.hooks: Dict[str, Callable[..., Coroutine[Any, Any, Any]]] = hooks
self._ready_task: Optional[asyncio.Task] = None
self.heartbeat_timeout: float = options.get('heartbeat_timeout', 60.0)
allowed_mentions = options.get('allowed_mentions')
if allowed_mentions is not None and not isinstance(allowed_mentions, AllowedMentions):
raise TypeError('allowed_mentions parameter must be AllowedMentions')
self.allowed_mentions: Optional[AllowedMentions] = allowed_mentions
self._chunk_requests: Dict[Union[str, int], ChunkRequest] = {}
self._scrape_requests: Dict[Union[str, int], MemberSidebar] = {}
activities = options.get('activities', [])
if not activities:
activity = options.get('activity')
if activity is not None:
activities = [activity]
if not all(isinstance(activity, BaseActivity) for activity in activities):
raise TypeError('activity parameter must derive from BaseActivity.')
activities = [activity.to_dict() for activity in activities]
status = options.get('status', None)
if status:
if status is Status.offline:
status = 'invisible'
else:
status = str(status)
self._chunk_guilds: bool = options.get('chunk_guilds_at_startup', True)
self._request_guilds = options.get('request_guilds', True)
cache_flags = options.get('member_cache_flags', None)
if cache_flags is None:
cache_flags = MemberCacheFlags.all()
else:
if not isinstance(cache_flags, MemberCacheFlags):
raise TypeError(f'member_cache_flags parameter must be MemberCacheFlags not {type(cache_flags)!r}')
self.member_cache_flags: MemberCacheFlags = cache_flags
self._activities: List[ActivityPayload] = activities
self._status: Optional[str] = status
if cache_flags._empty:
self.store_user = self.create_user # type: ignore # This reassignment is on purpose
self.parsers: Dict[str, Callable[[Any], None]]
self.parsers = parsers = {}
for attr, func in inspect.getmembers(self):
if attr.startswith('parse_'):
parsers[attr[6:].upper()] = func
self.clear()
def clear(self) -> None:
self.user: Optional[ClientUser] = None
self._users: weakref.WeakValueDictionary[int, User] = weakref.WeakValueDictionary()
self.settings: Optional[UserSettings] = None
self.consents: Optional[Tracking] = None
self.analytics_token: Optional[str] = None
self.preferred_region: Optional[str] = None
self._emojis: Dict[int, Emoji] = {}
self._stickers: Dict[int, GuildSticker] = {}
self._guilds: Dict[int, Guild] = {}
self._queued_guilds: Dict[int, Guild] = {}
self._unavailable_guilds: Dict[int, UnavailableGuildType] = {}
self._calls: Dict[int, Call] = {}
self._call_message_cache: Dict[int, Message] = {} # Hopefully this won't be a memory leak
self._voice_clients: Dict[int, VoiceProtocol] = {}
self._voice_states: Dict[int, VoiceState] = {}
self._interaction_cache: Dict[Union[int, str], Tuple[int, Optional[str], MessageableChannel]] = {}
self._interactions: OrderedDict[Union[int, str], Interaction] = OrderedDict() # LRU of max size 15
self._relationships: Dict[int, Relationship] = {}
self._private_channels: Dict[int, PrivateChannel] = {}
self._private_channels_by_user: Dict[int, DMChannel] = {}
self._last_private_channel: tuple = (None, None)
if self.max_messages is not None:
self._messages: Optional[Deque[Message]] = deque(maxlen=self.max_messages)
else:
self._messages: Optional[Deque[Message]] = None
def process_chunk_requests(self, guild_id: int, nonce: Optional[str], members: List[Member], complete: bool) -> None:
removed = []
for key, request in self._chunk_requests.items():
if request.guild_id == guild_id and request.nonce == nonce:
request.add_members(members)
if complete:
request.done()
removed.append(key)
for key in removed:
del self._chunk_requests[key]
def call_handlers(self, key: str, *args: Any, **kwargs: Any) -> None:
try:
func = self.handlers[key]
except KeyError:
pass
else:
func(*args, **kwargs)
async def call_hooks(self, key: str, *args: Any, **kwargs: Any) -> None:
try:
coro = self.hooks[key]
except KeyError:
pass
else:
await coro(*args, **kwargs)
async def async_setup(self) -> None:
pass
@property
def session_id(self) -> Optional[str]:
return self.ws.session_id
@property
def ws(self):
return self.client.ws
@property
def self_id(self) -> Optional[int]:
u = self.user
return u.id if u else None
@property
def voice_clients(self) -> List[VoiceProtocol]:
return list(self._voice_clients.values())
def _update_voice_state(
self, data: GuildVoiceState, channel_id: Optional[int]
) -> Tuple[Optional[User], VoiceState, VoiceState]:
user_id = int(data['user_id'])
user = self.get_user(user_id)
channel: Optional[Union[DMChannel, GroupChannel]] = self._get_private_channel(channel_id) # type: ignore
try:
# Check if we should remove the voice state from cache
if channel is None:
after = self._voice_states.pop(user_id)
else:
after = self._voice_states[user_id]
before = copy.copy(after)
after._update(data, channel)
except KeyError:
# if we're here then add it into the cache
after = VoiceState(data=data, channel=channel)
before = VoiceState(data=data, channel=None)
self._voice_states[user_id] = after
return user, before, after
def _voice_state_for(self, user_id: int) -> Optional[VoiceState]:
return self._voice_states.get(user_id)
def _get_voice_client(self, guild_id: Optional[int]) -> Optional[VoiceProtocol]:
# The keys of self._voice_clients are ints
return self._voice_clients.get(guild_id) # type: ignore
def _add_voice_client(self, guild_id: int, voice: VoiceProtocol) -> None:
self._voice_clients[guild_id] = voice
def _remove_voice_client(self, guild_id: int) -> None:
self._voice_clients.pop(guild_id, None)
def _update_references(self, ws: DiscordWebSocket) -> None:
for vc in self.voice_clients:
vc.main_ws = ws # type: ignore # Silencing the unknown attribute (ok at runtime).
def _add_interaction(self, interaction: Interaction) -> None:
self._interactions[interaction.id] = interaction
if len(self._interactions) > 15:
self._interactions.popitem(last=False)
def store_user(self, data: Union[UserPayload, PartialUserPayload]) -> User:
# this way is 300% faster than `dict.setdefault`.
user_id = int(data['id'])
try:
user = self._users[user_id]
# We use the data available to us since we
# might not have events for that user
# However, the data may only have an ID
try:
user._update(data)
except KeyError:
pass
return user
except KeyError:
user = User(state=self, data=data)
if user.discriminator != '0000':
self._users[user_id] = user
return user
def create_user(self, data: Union[UserPayload, PartialUserPayload]) -> User:
return User(state=self, data=data)
def get_user(self, id: int) -> Optional[User]:
return self._users.get(id)
def store_emoji(self, guild: Guild, data: EmojiPayload) -> Emoji:
# The id will be present here
emoji_id = int(data['id']) # type: ignore
self._emojis[emoji_id] = emoji = Emoji(guild=guild, state=self, data=data)
return emoji
def store_sticker(self, guild: Guild, data: GuildStickerPayload) -> GuildSticker:
sticker_id = int(data['id'])
self._stickers[sticker_id] = sticker = GuildSticker(state=self, data=data)
return sticker
@property
def guilds(self) -> List[Guild]:
return list(self._guilds.values())
def _get_guild(self, guild_id: Optional[int]) -> Optional[Guild]:
# The keys of self._guilds are ints
guild = self._guilds.get(guild_id) # type: ignore
if guild is None:
guild = self._queued_guilds.get(guild_id) # type: ignore
return guild
def _add_guild(self, guild: Guild) -> None:
self._guilds[guild.id] = guild
def _remove_guild(self, guild: Guild) -> None:
self._guilds.pop(guild.id, None)
for emoji in guild.emojis:
self._emojis.pop(emoji.id, None)
for sticker in guild.stickers:
self._stickers.pop(sticker.id, None)
del guild
@property
def emojis(self) -> List[Emoji]:
return list(self._emojis.values())
@property
def stickers(self) -> List[GuildSticker]:
return list(self._stickers.values())
def get_emoji(self, emoji_id: Optional[int]) -> Optional[Emoji]:
# the keys of self._emojis are ints
return self._emojis.get(emoji_id) # type: ignore
def get_sticker(self, sticker_id: Optional[int]) -> Optional[GuildSticker]:
# the keys of self._stickers are ints
return self._stickers.get(sticker_id) # type: ignore
@property
def private_channels(self) -> List[PrivateChannel]:
return list(self._private_channels.values())
async def access_private_channel(self, channel_id: int) -> None:
if not self._get_accessed_private_channel(channel_id):
await self._access_private_channel(channel_id)
self._set_accessed_private_channel(channel_id)
async def _access_private_channel(self, channel_id: int) -> None:
if (ws := self.ws) is None:
return
try:
await ws.access_dm(channel_id)
except Exception as exc:
_log.warning('Sending ACCESS_DM failed for channel %s, (%s).', channel_id, exc)
def _set_accessed_private_channel(self, channel_id):
self._last_private_channel = (channel_id, time.time())
def _get_accessed_private_channel(self, channel_id):
timestamp, existing_id = self._last_private_channel
return existing_id == channel_id and int(time.time() - timestamp) < random.randrange(120000, 420000)
def _get_private_channel(self, channel_id: Optional[int]) -> Optional[PrivateChannel]:
# The keys of self._private_channels are ints
return self._private_channels.get(channel_id) # type: ignore
def _get_private_channel_by_user(self, user_id: Optional[int]) -> Optional[DMChannel]:
# The keys of self._private_channels are ints
return self._private_channels_by_user.get(user_id) # type: ignore
def _add_private_channel(self, channel: PrivateChannel) -> None:
channel_id = channel.id
self._private_channels[channel_id] = channel
if isinstance(channel, DMChannel) and channel.recipient:
self._private_channels_by_user[channel.recipient.id] = channel
def add_dm_channel(self, data: DMChannelPayload) -> DMChannel:
# self.user is *always* cached when this is called
channel = DMChannel(me=self.user, state=self, data=data) # type: ignore
self._add_private_channel(channel)
return channel
def _remove_private_channel(self, channel: PrivateChannel) -> None:
self._private_channels.pop(channel.id, None)
if isinstance(channel, DMChannel):
recipient = channel.recipient
if recipient is not None:
self._private_channels_by_user.pop(recipient.id, None)
def _get_message(self, msg_id: Optional[int]) -> Optional[Message]:
return (
utils.find(lambda m: m.id == msg_id, reversed(self._messages))
if self._messages
else utils.find(lambda m: m.id == msg_id, reversed(self._call_message_cache.values()))
)
def _add_guild_from_data(self, data: GuildPayload, *, from_ready: bool = False) -> Optional[Guild]:
guild_id = int(data['id'])
unavailable = data.get('unavailable', False)
if not unavailable:
guild = Guild(data=data, state=self)
self._add_guild(guild)
return guild
else:
self._unavailable_guilds[guild_id] = UnavailableGuildType.existing if from_ready else UnavailableGuildType.joined
_log.debug('Forcing GUILD_CREATE for unavailable guild %s.' % guild_id)
asyncio.ensure_future(self.request_guild(guild_id), loop=self.loop)
def _guild_needs_chunking(self, guild: Guild) -> bool:
return self._chunk_guilds and not guild.chunked and not guild._offline_members_hidden
def _get_guild_channel(
self, data: PartialMessagePayload, guild_id: Optional[int] = None
) -> Tuple[Union[Channel, Thread], Optional[Guild]]:
channel_id = int(data['channel_id'])
try:
guild_id = guild_id or int(data['guild_id'])
guild = self._get_guild(guild_id)
except KeyError:
channel = self.get_channel(channel_id)
guild = None
else:
channel = guild and guild._resolve_channel(channel_id)
return channel or PartialMessageable(state=self, id=channel_id), guild
async def _delete_messages(self, channel_id, messages, reason: Optional[str] = None) -> None:
delete_message = self.http.delete_message
for msg in messages:
try:
await delete_message(channel_id, msg.id, reason=reason)
except NotFound:
pass
def request_guild(self, guild_id: int, typing: bool = True, activities: bool = True, threads: bool = True) -> Coroutine:
return self.ws.request_lazy_guild(guild_id, typing=typing, activities=activities, threads=threads)
def chunker(
self, guild_id: int, query: str = '', limit: int = 0, presences: bool = True, *, nonce: Optional[str] = None
):
return self.ws.request_chunks([guild_id], query=query, limit=limit, presences=presences, nonce=nonce)
async def query_members(
self,
guild: Guild,
query: Optional[str],
limit: int,
user_ids: Optional[List[Snowflake]],
cache: bool,
presences: bool,
) -> List[Member]:
guild_id = guild.id
request = ChunkRequest(guild.id, self.loop, self._get_guild, cache=cache)
self._chunk_requests[request.nonce] = request
try:
await self.ws.request_chunks(
[guild_id], query=query, limit=limit, user_ids=user_ids, presences=presences, nonce=request.nonce
)
return await asyncio.wait_for(request.wait(), timeout=30.0)
except asyncio.TimeoutError:
_log.warning('Timed out waiting for chunks with query %r and limit %d for guild_id %d.', query, limit, guild_id)
raise
async def _delay_ready(self) -> None:
try:
states = []
for guild in self._guilds.values():
if self._request_guilds:
await self.request_guild(guild.id)
if self._guild_needs_chunking(guild):
future = await self.chunk_guild(guild, wait=False)
states.append((guild, future))
for guild, future in states:
try:
await asyncio.wait_for(future, timeout=10)
except asyncio.TimeoutError:
_log.warning('Timed out waiting for chunks for guild_id %s.', guild.id)
except asyncio.CancelledError:
pass
else:
# Dispatch the event
self.call_handlers('ready')
self.dispatch('ready')
finally:
self._ready_task = None
def parse_ready(self, data: gw.ReadyEvent) -> None:
# Before parsing, we wait for READY_SUPPLEMENTAL
# This has voice state objects, as well as an initial member cache
self._ready_data = data
def parse_ready_supplemental(self, extra_data: gw.ReadySupplementalEvent) -> None:
if self._ready_task is not None:
self._ready_task.cancel()
self.clear()
data = self._ready_data
guild_settings = data.get('user_guild_settings', {}).get('entries', [])
# Temp user parsing
temp_users: Dict[int, UserPayload] = {int(data['user']['id']): data['user']}
for u in data.get('users', []):
u_id = int(u['id'])
temp_users[u_id] = u
# Discord bad
for guild_data, guild_extra, merged_members, merged_me, merged_presences in zip(
data.get('guilds', []),
extra_data.get('guilds', []),
extra_data.get('merged_members', []),
data.get('merged_members', []),
extra_data['merged_presences'].get('guilds', []),
):
guild_data['settings'] = utils.find( # type: ignore # This key does not actually exist in the payload
lambda i: i['guild_id'] == guild_data['id'],
guild_settings,
) or {'guild_id': guild_data['id']}
for presence in merged_presences:
presence['user'] = {'id': presence['user_id']} # type: ignore # :(
voice_states = guild_data.setdefault('voice_states', [])
voice_states.extend(guild_extra.get('voice_states', []))
members = guild_data.setdefault('members', [])
members.extend(merged_me)
members.extend(merged_members)
presences = guild_data.setdefault('presences', [])
presences.extend(merged_presences)
for member in members:
if 'user' not in member:
member['user'] = temp_users.get(int(member.pop('user_id')))
for voice_state in voice_states:
if 'member' not in voice_state:
member = utils.find(lambda m: m['user']['id'] == voice_state['user_id'], members)
if member:
voice_state['member'] = member
# There's also a friends key that has presence data for your friends
# Parsing that would require a redesign of the Relationship class ;-;
# Self parsing
self.user = user = ClientUser(state=self, data=data['user'])
self._users[user.id] = user # type: ignore
# Guild parsing
for guild_data in data.get('guilds', []):
self._add_guild_from_data(guild_data, from_ready=True)
# Relationship parsing
for relationship in data.get('relationships', []):
try:
r_id = int(relationship['id'])
except KeyError:
continue
else:
if 'user' not in relationship:
relationship['user'] = temp_users[int(relationship.pop('user_id'))]
self._relationships[r_id] = Relationship(state=self, data=relationship)
# Private channel parsing
for pm in data.get('private_channels', []):
factory, _ = _private_channel_factory(pm['type'])
if 'recipients' not in pm:
pm['recipients'] = [temp_users[int(u_id)] for u_id in pm.pop('recipient_ids')] # type: ignore
self._add_private_channel(factory(me=user, data=pm, state=self)) # type: ignore
# Extras
self.analytics_token = data.get('analytics_token')
region = data.get('geo_ordered_rtc_regions', ['us-west'])[0]
self.preferred_region = region
self.settings = UserSettings(data=data.get('user_settings', {}), state=self)
self.consents = Tracking(data=data.get('consents', {}), state=self)
if 'required_action' in data: # Locked more than likely
self.parse_user_required_action_update(data)
if 'sessions' in data:
self.parse_sessions_replace(data['sessions'])
# We're done
del self._ready_data
self.call_handlers('connect')
self.dispatch('connect')
self._ready_task = asyncio.create_task(self._delay_ready())
def parse_resumed(self, data: gw.ResumedEvent) -> None:
self.dispatch('resumed')
def parse_message_create(self, data: gw.MessageCreateEvent) -> None:
guild_id = utils._get_as_snowflake(data, 'guild_id')
channel, _ = self._get_guild_channel(data)
if guild_id in self._unavailable_guilds: # I don't know how I feel about this :(
return
# channel will be the correct type here
message = Message(channel=channel, data=data, state=self) # type: ignore
self.dispatch('message', message)
if self._messages is not None:
self._messages.append(message)
if message.call is not None:
self._call_message_cache[message.id] = message
if channel:
channel.last_message_id = message.id # type: ignore
def parse_message_delete(self, data: gw.MessageDeleteEvent) -> None:
raw = RawMessageDeleteEvent(data)
found = self._get_message(raw.message_id)
raw.cached_message = found
self.dispatch('raw_message_delete', raw)
if self._messages is not None and found is not None:
self.dispatch('message_delete', found)
self._messages.remove(found)
def parse_message_delete_bulk(self, data: gw.MessageDeleteBulkEvent) -> None:
raw = RawBulkMessageDeleteEvent(data)
if self._messages:
found_messages = [message for message in self._messages if message.id in raw.message_ids]
else:
found_messages = []
raw.cached_messages = found_messages
self.dispatch('raw_bulk_message_delete', raw)
if found_messages:
self.dispatch('bulk_message_delete', found_messages)
for msg in found_messages:
# self._messages won't be None here
self._messages.remove(msg) # type: ignore
def parse_message_update(self, data: gw.MessageUpdateEvent) -> None:
raw = RawMessageUpdateEvent(data)
message = self._get_message(raw.message_id)
if message is not None:
older_message = copy.copy(message)
raw.cached_message = older_message
self.dispatch('raw_message_edit', raw)
message._update(data)
# Coerce the `after` parameter to take the new updated Member
# ref: #5999
older_message.author = message.author
self.dispatch('message_edit', older_message, message)
else:
self.dispatch('raw_message_edit', raw)
def parse_message_reaction_add(self, data: gw.MessageReactionAddEvent) -> None:
emoji = data['emoji']
emoji_id = utils._get_as_snowflake(emoji, 'id')
emoji = PartialEmoji.with_state(self, id=emoji_id, animated=emoji.get('animated', False), name=emoji['name']) # type: ignore
raw = RawReactionActionEvent(data, emoji, 'REACTION_ADD')
member_data = data.get('member')
if member_data:
guild = self._get_guild(raw.guild_id)
if guild is not None:
raw.member = Member(data=member_data, guild=guild, state=self)
else:
raw.member = None
else:
raw.member = None
self.dispatch('raw_reaction_add', raw)
# rich interface here
message = self._get_message(raw.message_id)
if message is not None:
emoji = self._upgrade_partial_emoji(emoji)
reaction = message._add_reaction(data, emoji, raw.user_id)
user = raw.member or self._get_reaction_user(message.channel, raw.user_id)
if user:
self.dispatch('reaction_add', reaction, user)
def parse_message_reaction_remove_all(self, data: gw.MessageReactionRemoveAllEvent) -> None:
raw = RawReactionClearEvent(data)
self.dispatch('raw_reaction_clear', raw)
message = self._get_message(raw.message_id)
if message is not None:
old_reactions = message.reactions.copy()
message.reactions.clear()
self.dispatch('reaction_clear', message, old_reactions)
def parse_message_reaction_remove(self, data: gw.MessageReactionRemoveEvent) -> None:
emoji = PartialEmoji.from_dict(data['emoji'])
emoji._state = self
raw = RawReactionActionEvent(data, emoji, 'REACTION_REMOVE')
self.dispatch('raw_reaction_remove', raw)
message = self._get_message(raw.message_id)
if message is not None:
emoji = self._upgrade_partial_emoji(emoji)
try:
reaction = message._remove_reaction(data, emoji, raw.user_id)
except (AttributeError, ValueError): # eventual consistency lol
pass
else:
user = self._get_reaction_user(message.channel, raw.user_id)
if user:
self.dispatch('reaction_remove', reaction, user)
def parse_message_reaction_remove_emoji(self, data: gw.MessageReactionRemoveEmojiEvent) -> None:
emoji = PartialEmoji.from_dict(data['emoji'])
emoji._state = self
raw = RawReactionClearEmojiEvent(data, emoji)
self.dispatch('raw_reaction_clear_emoji', raw)
message = self._get_message(raw.message_id)
if message is not None:
try:
reaction = message._clear_emoji(emoji)
except (AttributeError, ValueError): # eventual consistency lol
pass
else:
if reaction:
self.dispatch('reaction_clear_emoji', reaction)
def parse_presences_replace(self, data: List[gw.PresenceUpdateEvent]) -> None:
for presence in data:
self.parse_presence_update(presence)
def parse_presence_update(self, data: gw.PresenceUpdateEvent) -> None:
guild_id = utils._get_as_snowflake(data, 'guild_id')
guild = self._get_guild(guild_id)
if guild is None:
_log.debug('PRESENCE_UPDATE referencing an unknown guild ID: %s. Discarding.', guild_id)
return
user = data['user']
member_id = int(user['id'])
member = guild.get_member(member_id)
if member is None:
_log.debug('PRESENCE_UPDATE referencing an unknown member ID: %s. Discarding', member_id)
return
old_member = Member._copy(member)
user_update = member._presence_update(data=data, user=user)
if user_update:
self.dispatch('user_update', user_update[0], user_update[1])
if old_member._client_status != member._client_status or old_member._activities != member._activities:
self.dispatch('presence_update', old_member, member)
def parse_user_update(self, data: gw.UserUpdateEvent) -> None:
if self.user:
self.user._update(data)
def parse_user_settings_update(self, data) -> None:
new_settings = self.settings
old_settings = copy.copy(new_settings)
new_settings._update(data) # type: ignore
self.dispatch('settings_update', old_settings, new_settings)
self.dispatch('internal_settings_update', old_settings, new_settings)
def parse_user_guild_settings_update(self, data) -> None:
guild_id = int(data['guild_id'])
guild = self._get_guild(guild_id)
if guild is None:
_log.debug('USER_GUILD_SETTINGS_UPDATE referencing an unknown guild ID: %s. Discarding.', guild_id)
return
settings = guild.notification_settings
if settings is not None:
old_settings = copy.copy(settings)
settings._update(data)
else:
old_settings = None
settings = GuildSettings(data=data, state=self)
self.dispatch('guild_settings_update', old_settings, settings)
def parse_user_required_action_update(self, data) -> None:
required_action = try_enum(RequiredActionType, data['required_action'])
self.dispatch('required_action_update', required_action)
def parse_user_connections_update(self, data: gw.ResumedEvent) -> None:
self.dispatch('connections_update')
def parse_sessions_replace(self, data: List[Dict[str, Any]]) -> None:
overall = MISSING
this = MISSING
client_status = _ClientStatus()
client_activities = {}
statuses = {}
if len(data) == 1:
overall = this = data[0]
# Duplicates will be overwritten I guess
for session in data:
if session['session_id'] == 'all':
overall = session
data.remove(session)
continue
elif session['session_id'] == self.session_id:
this = session
continue
key = session['client_info']['client']
statuses[key] = session['status']
client_activities[key] = tuple(session['activities'])
if overall is MISSING and this is MISSING:
_log.debug('SESSIONS_REPLACE has weird data: %s.', data)
return # ._.
elif overall is MISSING:
overall = this
elif this is MISSING:
this = overall
client_status._update(overall['status'], statuses) # type: ignore
client_status._this = this['status']
client_activities[None] = tuple(overall['activities'])
client_activities['this'] = tuple(this['activities'])
client = self.client
client._client_status = client_status
client._client_activities = client_activities
client._session_count = len(data)
def parse_invite_create(self, data: gw.InviteCreateEvent) -> None:
invite = Invite.from_gateway(state=self, data=data)
self.dispatch('invite_create', invite)
def parse_invite_delete(self, data: gw.InviteDeleteEvent) -> None:
invite = Invite.from_gateway(state=self, data=data)
self.dispatch('invite_delete', invite)
def parse_channel_delete(self, data: gw.ChannelDeleteEvent) -> None:
guild = self._get_guild(utils._get_as_snowflake(data, 'guild_id'))
channel_id = int(data['id'])
if guild is not None:
channel = guild.get_channel(channel_id)
if channel is not None:
guild._remove_channel(channel)
self.dispatch('guild_channel_delete', channel)
if channel.type in (ChannelType.voice, ChannelType.stage_voice):
for s in guild.scheduled_events:
if s.channel_id == channel.id:
guild._scheduled_events.pop(s.id)
self.dispatch('scheduled_event_delete', guild, s)
else:
channel = self._get_private_channel(channel_id)
if channel is not None:
self._remove_private_channel(channel)
self.dispatch('private_channel_delete', channel)
def parse_channel_update(self, data: gw.ChannelUpdateEvent) -> None:
channel_type = try_enum(ChannelType, data.get('type'))
channel_id = int(data['id'])
if channel_type is ChannelType.group:
channel = self._get_private_channel(channel_id)
if channel is not None:
old_channel = copy.copy(channel)
# The channel is a GroupChannel
channel._update_group(data) # type: ignore
self.dispatch('private_channel_update', old_channel, channel)
return
else:
_log.debug('CHANNEL_UPDATE referencing an unknown channel ID: %s. Discarding.', channel_id)
guild_id = utils._get_as_snowflake(data, 'guild_id')
guild = self._get_guild(guild_id)
if guild is not None:
channel = guild.get_channel(channel_id)
if channel is not None:
old_channel = copy.copy(channel)
channel._update(guild, data) # type: ignore # the data payload varies based on the channel type.
self.dispatch('guild_channel_update', old_channel, channel)
else:
_log.debug('CHANNEL_UPDATE referencing an unknown channel ID: %s. Discarding.', channel_id)
else:
_log.debug('CHANNEL_UPDATE referencing an unknown guild ID: %s. Discarding.', guild_id)
def parse_channel_create(self, data: gw.ChannelCreateEvent) -> None:
factory, ch_type = _channel_factory(data['type'])
if factory is None:
_log.debug('CHANNEL_CREATE referencing an unknown channel type %s. Discarding.', data['type'])
return
if ch_type in (ChannelType.group, ChannelType.private):
channel_id = int(data['id'])
if self._get_private_channel(channel_id) is None:
channel = factory(me=self.user, data=data, state=self) # type: ignore # user is always present when logged in
self._add_private_channel(channel) # type: ignore # channel will always be a private channel
self.dispatch('private_channel_create', channel)
else:
guild_id = utils._get_as_snowflake(data, 'guild_id')
guild = self._get_guild(guild_id)
if guild is not None:
# The factory can't be a DMChannel or GroupChannel here
channel = factory(guild=guild, state=self, data=data) # type: ignore
guild._add_channel(channel) # type: ignore
self.dispatch('guild_channel_create', channel)
else:
_log.debug('CHANNEL_CREATE referencing an unknown guild ID: %s. Discarding.', guild_id)
return
def parse_channel_pins_update(self, data: gw.ChannelPinsUpdateEvent) -> None:
channel_id = int(data['channel_id'])
try:
guild = self._get_guild(int(data['guild_id']))
except KeyError:
guild = None
channel = self._get_private_channel(channel_id)
else:
channel = guild and guild._resolve_channel(channel_id)
if channel is None:
_log.debug('CHANNEL_PINS_UPDATE referencing an unknown channel ID: %s. Discarding.', channel_id)
return
last_pin = utils.parse_time(data.get('last_pin_timestamp'))
if guild is None:
self.dispatch('private_channel_pins_update', channel, last_pin)
else:
self.dispatch('guild_channel_pins_update', channel, last_pin)
def parse_channel_recipient_add(self, data) -> None:
channel = self._get_private_channel(int(data['channel_id']))
user = self.store_user(data['user'])
channel.recipients.append(user) # type: ignore
self.dispatch('group_join', channel, user)
def parse_channel_recipient_remove(self, data) -> None:
channel = self._get_private_channel(int(data['channel_id']))
user = self.store_user(data['user'])
try:
channel.recipients.remove(user) # type: ignore
except ValueError:
pass
else:
self.dispatch('group_remove', channel, user)
def parse_thread_create(self, data: gw.ThreadCreateEvent) -> None:
guild_id = int(data['guild_id'])
guild: Optional[Guild] = self._get_guild(guild_id)
if guild is None:
_log.debug('THREAD_CREATE referencing an unknown guild ID: %s. Discarding.', guild_id)
return
existing = guild.get_thread(int(data['id']))
if existing is not None: # Shouldn't happen
old = existing._update(data)
if old is not None:
self.dispatch('thread_update', old, existing)
else:
thread = Thread(guild=guild, state=self, data=data)
guild._add_thread(thread)
if data.get('newly_created', False):
self.dispatch('thread_create', thread)
else:
self.dispatch('thread_join', thread)
def parse_thread_update(self, data: gw.ThreadUpdateEvent) -> None:
guild_id = int(data['guild_id'])
guild = self._get_guild(guild_id)
if guild is None:
_log.debug('THREAD_UPDATE referencing an unknown guild ID: %s. Discarding', guild_id)
return
existing = guild.get_thread(int(data['id']))
if existing is not None:
old = existing._update(data)
if existing.archived:
guild._remove_thread(existing)
if old is not None:
self.dispatch('thread_update', old, existing)
else: # Shouldn't happen
thread = Thread(guild=guild, state=self, data=data)
guild._add_thread(thread)
def parse_thread_delete(self, data: gw.ThreadDeleteEvent) -> None:
guild_id = int(data['guild_id'])
guild = self._get_guild(guild_id)
if guild is None:
_log.debug('THREAD_DELETE referencing an unknown guild ID: %s. Discarding', guild_id)
return
thread_id = int(data['id'])
thread = guild.get_thread(thread_id)
if thread is not None:
guild._remove_thread(thread)
self.dispatch('thread_delete', thread)
def parse_thread_list_sync(self, data: gw.ThreadListSyncEvent) -> None:
guild_id = int(data['guild_id'])
guild: Optional[Guild] = self._get_guild(guild_id)
if guild is None:
_log.debug('THREAD_LIST_SYNC referencing an unknown guild ID: %s. Discarding.', guild_id)
return
try:
channel_ids = {int(i) for i in data['channel_ids']}
except KeyError:
channel_ids = None
threads = guild._threads.copy()
else:
threads = guild._filter_threads(channel_ids)
new_threads = {}
for d in data.get('threads', []):
if (thread := threads.pop(int(d['id']), None)) is not None:
old = thread._update(d)
if old is not None:
self.dispatch('thread_update', old, thread) # Honestly not sure if this is right
else:
thread = Thread(guild=guild, state=self, data=d)
new_threads[thread.id] = thread
old_threads = [t for t in threads.values() if t.id not in new_threads]
for member in data.get('members', []):
try: # Note: member['id'] is the thread_id
thread = threads[int(member['id'])]
except KeyError:
continue
else:
thread._add_member(ThreadMember(thread, member))
for k in new_threads.values():
guild._add_thread(k)
for k in old_threads:
del guild._threads[k.id]
self.dispatch('thread_delete', k) # Again, not sure
for message in data.get('most_recent_messages', []):
guild_id = utils._get_as_snowflake(message, 'guild_id')
channel, _ = self._get_guild_channel(message)
if guild_id in self._unavailable_guilds: # I don't know how I feel about this :(
continue
# channel will be the correct type here
message = Message(channel=channel, data=message, state=self) # type: ignore
if self._messages is not None:
self._messages.append(message)
def parse_thread_member_update(self, data: gw.ThreadMemberUpdate) -> None:
guild_id = int(data['guild_id'])
guild: Optional[Guild] = self._get_guild(guild_id)
if guild is None:
_log.debug('THREAD_MEMBER_UPDATE referencing an unknown guild ID: %s. Discarding.', guild_id)
return
thread_id = int(data['id'])
thread: Optional[Thread] = guild.get_thread(thread_id)
if thread is None:
_log.debug('THREAD_MEMBER_UPDATE referencing an unknown thread ID: %s. Discarding.', thread_id)
return
member = ThreadMember(thread, data)
thread.me = member
def parse_thread_members_update(self, data: gw.ThreadMembersUpdate) -> None:
guild_id = int(data['guild_id'])
guild: Optional[Guild] = self._get_guild(guild_id)
if guild is None:
_log.debug('THREAD_MEMBERS_UPDATE referencing an unknown guild ID: %s. Discarding.', guild_id)
return
thread_id = int(data['id'])
thread: Optional[Thread] = guild.get_thread(thread_id)
if thread is None:
_log.debug('THREAD_MEMBERS_UPDATE referencing an unknown thread ID: %s. Discarding.', thread_id)
return
added_members = [ThreadMember(thread, d) for d in data.get('added_members', [])]
removed_member_ids = [int(x) for x in data.get('removed_member_ids', [])]
self_id = self.self_id
for member in added_members:
if member.id != self_id:
thread._add_member(member)
self.dispatch('thread_member_join', member)
else:
thread.me = member
self.dispatch('thread_join', thread)
for member_id in removed_member_ids:
member = thread._pop_member(member_id)
if member_id != self_id:
if member is not None:
self.dispatch('thread_member_remove', member)
else:
self.dispatch('raw_thread_member_remove', thread, member_id)
else:
self.dispatch('thread_remove', thread)
def parse_guild_member_add(self, data: gw.GuildMemberAddEvent) -> None:
guild = self._get_guild(int(data['guild_id']))
if guild is None:
_log.debug('GUILD_MEMBER_ADD referencing an unknown guild ID: %s. Discarding.', data['guild_id'])
return
if self.member_cache_flags.other or int(data['user']['id']) == self.self_id or guild.chunked:
member = Member(guild=guild, data=data, state=self)
guild._add_member(member)
if (presence := data.get('presence')) is not None:
member._presence_update(presence, tuple()) # type: ignore
def parse_guild_member_remove(self, data: gw.GuildMemberRemoveEvent) -> None:
guild = self._get_guild(int(data['guild_id']))
if guild is not None:
if guild._member_count is not None:
guild._member_count -= 1
user_id = int(data['user']['id'])
member = guild.get_member(user_id)
if member is not None:
guild._remove_member(member)
self.dispatch('member_remove', member)
else:
_log.debug('GUILD_MEMBER_REMOVE referencing an unknown guild ID: %s. Discarding.', data['guild_id'])
def parse_guild_member_update(self, data: gw.GuildMemberUpdateEvent) -> None:
guild = self._get_guild(int(data['guild_id']))
user = data['user']
user_id = int(user['id'])
if guild is None:
_log.debug('GUILD_MEMBER_UPDATE referencing an unknown guild ID: %s. Discarding.', data['guild_id'])
return
member = guild.get_member(user_id)
if member is not None:
old_member = member._update(data)
user_update = member._update_inner_user(user)
if user_update:
self.dispatch('user_update', user_update[0], user_update[1])
if old_member is not None:
self.dispatch('member_update', old_member, member)
else:
if self.member_cache_flags.other or user_id == self.self_id or guild.chunked:
member = Member(data=data, guild=guild, state=self) # type: ignore # The data is close enough
# Force an update on the inner user if necessary
user_update = member._update_inner_user(user)
if user_update:
self.dispatch('user_update', user_update[0], user_update[1])
guild._add_member(member)
_log.debug('GUILD_MEMBER_UPDATE referencing an unknown member ID: %s.', user_id)
def parse_guild_sync(self, data) -> None:
print('I noticed you triggered a `GUILD_SYNC`.\nIf you want to share your secrets, please feel free to email me.')
def parse_guild_member_list_update(self, data) -> None:
self.dispatch('raw_member_list_update', data)
guild = self._get_guild(int(data['guild_id']))
if guild is None:
_log.debug('GUILD_MEMBER_LIST_UPDATE referencing an unknown guild ID: %s. Discarding.', data['guild_id'])
return
request = self._scrape_requests.get(guild.id)
should_parse = guild.chunked or getattr(request, 'chunk', False)
if (count := data['member_count']) > 0:
guild._member_count = count
if (count := data['online_count']) > 0:
guild._presence_count = count
guild._true_online_count = sum(group['count'] for group in data['groups'] if group['id'] != 'offline')
empty_tuple = tuple()
to_add = []
to_remove = []
disregard = []
members = []
if should_parse: # The SYNCs need to be first and in order for indexes to not crap a brick
syncs = [opdata for opdata in data['ops'] if opdata['op'] == 'SYNC']
syncs.sort(key=lambda op: op['range'][0])
ops = syncs + [opdata for opdata in data['ops'] if opdata['op'] != 'SYNC']
else:
ops = data['ops']
for opdata in ops:
op = opdata['op']
# The OPs are as follows:
# SYNC: Provides member/presence data for a 100 member range of the member list
# UPDATE: Dispatched when a member is updated and stays in the same range
# INSERT: Dispatched when a member is inserted into a range
# DELETE: Dispatched when a member is removed from a range
# INVALIDATE: Sent when you're unsubscribed from a range
if op == 'SYNC':
for item in opdata['items']:
if 'group' in item: # Hoisted role
guild._member_list.append(None) if should_parse else None # Insert blank so indexes don't fuck up
continue
member = Member(data=item['member'], guild=guild, state=self)
if presence := item['member'].get('presence'):
member._presence_update(presence, empty_tuple) # type: ignore
members.append(member)
guild._member_list.append(member) if should_parse else None
elif op == 'INSERT':
index = opdata['index']
item = opdata['item']
if 'group' in item: # Hoisted role
guild._member_list.insert(index, None) if should_parse else None # Insert blank so indexes don't fuck up
continue
mdata = item['member']
user = mdata['user']
user_id = int(user['id'])
member = guild.get_member(user_id)
if member is not None: # INSERTs are also sent when a user changes range
old_member = Member._copy(member)
dispatch = bool(member._update(mdata))
if presence := mdata.get('presence'):
member._presence_update(presence, empty_tuple) # type: ignore
if should_parse and (
old_member._client_status != member._client_status or old_member._activities != member._activities
):
self.dispatch('presence_update', old_member, member)
user_update = member._update_inner_user(user)
if should_parse and user_update:
self.dispatch('user_update', user_update[0], user_update[1])
if should_parse and dispatch:
self.dispatch('member_update', old_member, member)
disregard.append(member)
else:
member = Member(data=mdata, guild=guild, state=self)
if presence := mdata.get('presence'):
member._presence_update(presence, empty_tuple) # type: ignore
to_add.append(member)
guild._member_list.insert(index, member) if should_parse else None
elif op == 'UPDATE' and should_parse:
item = opdata['item']
if 'group' in item: # Hoisted role
continue
mdata = item['member']
user = mdata['user']
user_id = int(user['id'])
member = guild.get_member(user_id)
if member is not None:
old_member = Member._copy(member)
dispatch = bool(member._update(mdata))
if presence := mdata.get('presence'):
member._presence_update(presence, empty_tuple) # type: ignore
if should_parse and (
old_member._client_status != member._client_status or old_member._activities != member._activities
):
self.dispatch('presence_update', old_member, member)
user_update = member._update_inner_user(user)
if should_parse and user_update:
self.dispatch('user_update', user_update[0], user_update[1])
if should_parse and dispatch:
self.dispatch('member_update', old_member, member)
else:
member = Member(data=mdata, guild=guild, state=self)
if presence := mdata.get('presence'):
member._presence_update(presence, empty_tuple) # type: ignore
guild._member_list.insert(opdata['index'], member) # Race condition?
elif op == 'DELETE' and should_parse:
index = opdata['index']
try:
item = guild._member_list.pop(index)
except IndexError:
_log.debug(
'GUILD_MEMBER_LIST_UPDATE type DELETE referencing an unknown member index %s in %s. Discarding.',
index,
guild.id,
)
continue
if item is not None:
to_remove.append(item)
if request:
request.add_members(members + to_add)
else:
for member in members + to_add:
guild._add_member(member)
if should_parse:
actually_remove = [member for member in to_remove if member not in to_add and member not in disregard]
actually_add = [member for member in to_add if member not in to_remove]
for member in actually_remove:
guild._remove_member(member)
self.dispatch('member_remove', member)
for member in actually_add:
self.dispatch('member_join', member)
def parse_guild_application_command_counts_update(self, data) -> None:
guild = self._get_guild(int(data['guild_id']))
if guild is None:
_log.debug(
'GUILD_APPLICATION_COMMAND_COUNTS_UPDATE referencing an unknown guild ID: %s. Discarding.', data['guild_id']
)
return
guild.command_counts = CommandCounts(data.get(0, 0), data.get(1, 0), data.get(2, 0))
def parse_guild_emojis_update(self, data: gw.GuildEmojisUpdateEvent) -> None:
guild = self._get_guild(int(data['guild_id']))
if guild is None:
_log.debug('GUILD_EMOJIS_UPDATE referencing an unknown guild ID: %s. Discarding.', data['guild_id'])
return
before_emojis = guild.emojis
for emoji in before_emojis:
self._emojis.pop(emoji.id, None)
# guild won't be None here
guild.emojis = tuple(map(lambda d: self.store_emoji(guild, d), data['emojis']))
self.dispatch('guild_emojis_update', guild, before_emojis, guild.emojis)
def parse_guild_stickers_update(self, data: gw.GuildStickersUpdateEvent) -> None:
guild = self._get_guild(int(data['guild_id']))
if guild is None:
_log.debug('GUILD_STICKERS_UPDATE referencing an unknown guild ID: %s. Discarding.', data['guild_id'])
return
before_stickers = guild.stickers
for emoji in before_stickers:
self._stickers.pop(emoji.id, None)
guild.stickers = tuple(map(lambda d: self.store_sticker(guild, d), data['stickers']))
self.dispatch('guild_stickers_update', guild, before_stickers, guild.stickers)
def _get_create_guild(self, data: gw.GuildCreateEvent):
guild = self._get_guild(int(data['id']))
# Discord being Discord sends a GUILD_CREATE after an OPCode 14 is sent (a la bots)
# However, we want that if we forced a GUILD_CREATE for an unavailable guild
if guild is not None:
guild._from_data(data)
return
return self._add_guild_from_data(data)
def is_guild_evicted(self, guild: Guild) -> bool:
return guild.id not in self._guilds
async def assert_guild_presence_count(self, guild: Guild):
if not guild._offline_members_hidden or guild._presence_count:
return
ws = self.ws
channel = None
for channel in guild.channels:
if channel.permissions_for(guild.me).read_messages and channel.type != ChannelType.stage_voice:
break
else:
raise RuntimeError('No channels viewable')
requests: Dict[Snowflake, List[List[int]]] = {str(channel.id): [[0, 99]]}
def predicate(data):
return int(data['guild_id']) == guild.id
await ws.request_lazy_guild(guild.id, channels=requests)
try:
await asyncio.wait_for(ws.wait_for('GUILD_MEMBER_LIST_UPDATE', predicate), timeout=15)
except asyncio.TimeoutError:
pass
if not guild._presence_count:
data = await self.http.get_guild_preview(guild.id)
guild._presence_count = data['approximate_presence_count']
@overload
async def scrape_guild(
self,
guild: Guild,
*,
wait: bool = True,
cache: bool,
force_scraping: bool = ...,
channels: List[abcSnowflake] = ...,
delay: Union[int, float] = ...,
) -> Optional[List[Member]]:
...
@overload
async def scrape_guild(
self,
guild: Guild,
*,
wait: bool = False,
cache: bool,
force_scraping: bool = ...,
channels: List[abcSnowflake] = ...,
delay: Union[int, float] = ...,
) -> asyncio.Future[Optional[List[Member]]]:
...
async def scrape_guild(
self,
guild: Guild,
*,
wait: bool = True,
cache: bool,
force_scraping: bool = False,
channels: List[abcSnowflake] = MISSING,
delay: Union[int, float] = MISSING,
) -> Union[Optional[List[Member]], asyncio.Future[Optional[List[Member]]]]:
if not guild.me:
await guild.query_members(user_ids=[self.self_id], cache=True) # type: ignore # self_id is always present here
if not force_scraping and any(
{
guild.me.guild_permissions.administrator,
guild.me.guild_permissions.kick_members,
guild.me.guild_permissions.ban_members,
guild.me.guild_permissions.manage_roles,
}
):
request = self._chunk_requests.get(guild.id)
if request is None:
self._chunk_requests[guild.id] = request = ChunkRequest(guild.id, self.loop, self._get_guild, cache=cache)
await self.chunker(guild.id, nonce=request.nonce)
else:
await self.assert_guild_presence_count(guild)
request = self._scrape_requests.get(guild.id)
if request is None:
self._scrape_requests[guild.id] = request = MemberSidebar(
guild, channels, chunk=False, cache=cache, loop=self.loop, delay=delay
)
request.start()
if wait:
return await request.wait()
return request.get_future() # type: ignore # Honestly, I'm confused too
@overload
async def chunk_guild(
self, guild: Guild, *, wait: Literal[True] = ..., channels: List[abcSnowflake] = ...
) -> Optional[List[Member]]:
...
@overload
async def chunk_guild(
self, guild: Guild, *, wait: Literal[False] = ..., channels: List[abcSnowflake] = ...
) -> asyncio.Future[Optional[List[Member]]]:
...
async def chunk_guild(
self,
guild: Guild,
*,
wait: bool = True,
channels: List[abcSnowflake] = MISSING,
) -> Union[asyncio.Future[Optional[List[Member]]], Optional[List[Member]]]:
if not guild.me:
await guild.query_members(user_ids=[self.self_id], cache=True) # type: ignore # self_id is always present here
request = self._scrape_requests.get(guild.id)
if request is None:
self._scrape_requests[guild.id] = request = MemberSidebar(
guild, channels, chunk=True, cache=True, loop=self.loop, delay=0
)
request.start()
if wait:
return await request.wait()
return request.get_future()
async def _chunk_and_dispatch(self, guild, chunk) -> None:
self._queued_guilds[guild.id] = guild
if chunk:
try:
await asyncio.wait_for(self.chunk_guild(guild), timeout=10)
except asyncio.TimeoutError:
_log.info('Somehow timed out waiting for chunks for guild %s.', guild.id)
self._queued_guilds.pop(guild.id)
# Dispatch available/join depending on circumstances
if guild.id in self._unavailable_guilds:
type = self._unavailable_guilds.pop(guild.id)
if type is UnavailableGuildType.existing:
self.dispatch('guild_available', guild)
else:
self.dispatch('guild_join', guild)
else:
self.dispatch('guild_join', guild)
def parse_guild_create(self, data: gw.GuildCreateEvent):
guild = self._get_create_guild(data)
if guild is None:
return
if self._request_guilds:
asyncio.ensure_future(self.request_guild(guild.id), loop=self.loop)
# Chunk if needed
needs_chunking = self._guild_needs_chunking(guild)
asyncio.ensure_future(self._chunk_and_dispatch(guild, needs_chunking), loop=self.loop)
def parse_guild_update(self, data: gw.GuildUpdateEvent) -> None:
guild = self._get_guild(int(data['id']))
if guild is not None:
old_guild = copy.copy(guild)
guild._from_data(data)
self.dispatch('guild_update', old_guild, guild)
else:
_log.debug('GUILD_UPDATE referencing an unknown guild ID: %s. Discarding.', data['id'])
def parse_guild_delete(self, data: gw.GuildDeleteEvent) -> None:
guild = self._get_guild(int(data['id']))
if guild is None:
_log.debug('GUILD_DELETE referencing an unknown guild ID: %s. Discarding.', data['id'])
return
if data.get('unavailable', False):
# GUILD_DELETE with unavailable being True means that the
# guild that was available is now currently unavailable
guild.unavailable = True
self.dispatch('guild_unavailable', guild)
return
# Cleanup the message cache
if self._messages is not None:
self._messages: Optional[Deque[Message]] = deque(
(msg for msg in self._messages if msg.guild != guild), maxlen=self.max_messages
)
self._remove_guild(guild)
self.dispatch('guild_remove', guild)
def parse_guild_ban_add(self, data: gw.GuildBanAddEvent) -> None:
guild = self._get_guild(int(data['guild_id']))
if guild is not None:
try:
user = User(data=data['user'], state=self)
except KeyError:
pass
else:
member = guild.get_member(user.id) or user
self.dispatch('member_ban', guild, member)
def parse_guild_ban_remove(self, data: gw.GuildBanRemoveEvent) -> None:
guild = self._get_guild(int(data['guild_id']))
if guild is not None and 'user' in data:
user = self.store_user(data['user'])
self.dispatch('member_unban', guild, user)
def parse_guild_role_create(self, data: gw.GuildRoleCreateEvent) -> None:
guild = self._get_guild(int(data['guild_id']))
if guild is None:
_log.debug('GUILD_ROLE_CREATE referencing an unknown guild ID: %s. Discarding.', data['guild_id'])
return
role_data = data['role']
role = Role(guild=guild, data=role_data, state=self)
guild._add_role(role)
self.dispatch('guild_role_create', role)
def parse_guild_role_delete(self, data: gw.GuildRoleDeleteEvent) -> None:
guild = self._get_guild(int(data['guild_id']))
if guild is not None:
role_id = int(data['role_id'])
try:
role = guild._remove_role(role_id)
except KeyError:
return
else:
self.dispatch('guild_role_delete', role)
else:
_log.debug('GUILD_ROLE_DELETE referencing an unknown guild ID: %s. Discarding.', data['guild_id'])
def parse_guild_role_update(self, data: gw.GuildRoleUpdateEvent) -> None:
guild = self._get_guild(int(data['guild_id']))
if guild is not None:
role_data = data['role']
role_id = int(role_data['id'])
role = guild.get_role(role_id)
if role is not None:
old_role = copy.copy(role)
role._update(role_data)
self.dispatch('guild_role_update', old_role, role)
else:
_log.debug('GUILD_ROLE_UPDATE referencing an unknown guild ID: %s. Discarding.', data['guild_id'])
def parse_guild_members_chunk(self, data: gw.GuildMembersChunkEvent) -> None:
guild_id = int(data['guild_id'])
guild = self._get_guild(guild_id)
presences = data.get('presences', [])
if guild is None:
return
members = [Member(guild=guild, data=member, state=self) for member in data.get('members', [])]
_log.debug('Processed a chunk for %s members in guild ID %s.', len(members), guild_id)
if presences:
member_dict: Dict[Snowflake, Member] = {str(member.id): member for member in members}
for presence in presences:
user = presence['user']
member_id = user['id']
member = member_dict.get(member_id)
if member is not None:
member._presence_update(presence, user)
complete = data.get('chunk_index', 0) + 1 == data.get('chunk_count')
self.process_chunk_requests(guild_id, data.get('nonce'), members, complete)
def parse_guild_integrations_update(self, data: gw.GuildIntegrationsUpdateEvent) -> None:
guild = self._get_guild(int(data['guild_id']))
if guild is not None:
self.dispatch('guild_integrations_update', guild)
else:
_log.debug('GUILD_INTEGRATIONS_UPDATE referencing an unknown guild ID: %s. Discarding.', data['guild_id'])
def parse_integration_create(self, data: gw.IntegrationCreateEvent) -> None:
guild_id = int(data.pop('guild_id'))
guild = self._get_guild(guild_id)
if guild is not None:
cls, _ = _integration_factory(data['type'])
integration = cls(data=data, guild=guild)
self.dispatch('integration_create', integration)
else:
_log.debug('INTEGRATION_CREATE referencing an unknown guild ID: %s. Discarding.', guild_id)
def parse_integration_update(self, data: gw.IntegrationUpdateEvent) -> None:
guild_id = int(data.pop('guild_id'))
guild = self._get_guild(guild_id)
if guild is not None:
cls, _ = _integration_factory(data['type'])
integration = cls(data=data, guild=guild)
self.dispatch('integration_update', integration)
else:
_log.debug('INTEGRATION_UPDATE referencing an unknown guild ID: %s. Discarding.', guild_id)
def parse_integration_delete(self, data: gw.IntegrationDeleteEvent) -> None:
guild_id = int(data['guild_id'])
guild = self._get_guild(guild_id)
if guild is not None:
raw = RawIntegrationDeleteEvent(data)
self.dispatch('raw_integration_delete', raw)
else:
_log.debug('INTEGRATION_DELETE referencing an unknown guild ID: %s. Discarding.', guild_id)
def parse_webhooks_update(self, data: gw.WebhooksUpdateEvent) -> None:
guild = self._get_guild(int(data['guild_id']))
if guild is None:
_log.debug('WEBHOOKS_UPDATE referencing an unknown guild ID: %s. Discarding', data['guild_id'])
return
channel = guild.get_channel(int(data['channel_id']))
if channel is not None:
self.dispatch('webhooks_update', channel)
else:
_log.debug('WEBHOOKS_UPDATE referencing an unknown channel ID: %s. Discarding.', data['channel_id'])
def parse_stage_instance_create(self, data: gw.StageInstanceCreateEvent) -> None:
guild = self._get_guild(int(data['guild_id']))
if guild is not None:
stage_instance = StageInstance(guild=guild, state=self, data=data)
guild._stage_instances[stage_instance.id] = stage_instance
self.dispatch('stage_instance_create', stage_instance)
else:
_log.debug('STAGE_INSTANCE_CREATE referencing unknown guild ID: %s. Discarding.', data['guild_id'])
def parse_stage_instance_update(self, data: gw.StageInstanceUpdateEvent) -> None:
guild = self._get_guild(int(data['guild_id']))
if guild is not None:
stage_instance = guild._stage_instances.get(int(data['id']))
if stage_instance is not None:
old_stage_instance = copy.copy(stage_instance)
stage_instance._update(data)
self.dispatch('stage_instance_update', old_stage_instance, stage_instance)
else:
_log.debug('STAGE_INSTANCE_UPDATE referencing unknown stage instance ID: %s. Discarding.', data['id'])
else:
_log.debug('STAGE_INSTANCE_UPDATE referencing unknown guild ID: %s. Discarding.', data['guild_id'])
def parse_stage_instance_delete(self, data: gw.StageInstanceDeleteEvent) -> None:
guild = self._get_guild(int(data['guild_id']))
if guild is not None:
try:
stage_instance = guild._stage_instances.pop(int(data['id']))
except KeyError:
pass
else:
self.dispatch('stage_instance_delete', stage_instance)
else:
_log.debug('STAGE_INSTANCE_DELETE referencing unknown guild ID: %s. Discarding.', data['guild_id'])
def parse_guild_scheduled_event_create(self, data: gw.GuildScheduledEventCreateEvent) -> None:
guild = self._get_guild(int(data['guild_id']))
if guild is not None:
scheduled_event = ScheduledEvent(state=self, data=data)
guild._scheduled_events[scheduled_event.id] = scheduled_event
self.dispatch('scheduled_event_create', scheduled_event)
else:
_log.debug('SCHEDULED_EVENT_CREATE referencing unknown guild ID: %s. Discarding.', data['guild_id'])
def parse_guild_scheduled_event_update(self, data: gw.GuildScheduledEventUpdateEvent) -> None:
guild = self._get_guild(int(data['guild_id']))
if guild is not None:
scheduled_event = guild._scheduled_events.get(int(data['id']))
if scheduled_event is not None:
old_scheduled_event = copy.copy(scheduled_event)
scheduled_event._update(data)
self.dispatch('scheduled_event_update', old_scheduled_event, scheduled_event)
else:
_log.debug('SCHEDULED_EVENT_UPDATE referencing unknown scheduled event ID: %s. Discarding.', data['id'])
else:
_log.debug('SCHEDULED_EVENT_UPDATE referencing unknown guild ID: %s. Discarding.', data['guild_id'])
def parse_guild_scheduled_event_delete(self, data: gw.GuildScheduledEventDeleteEvent) -> None:
guild = self._get_guild(int(data['guild_id']))
if guild is not None:
try:
scheduled_event = guild._scheduled_events.pop(int(data['id']))
except KeyError:
pass
else:
self.dispatch('scheduled_event_delete', scheduled_event)
else:
_log.debug('SCHEDULED_EVENT_DELETE referencing unknown guild ID: %s. Discarding.', data['guild_id'])
def parse_guild_scheduled_event_user_add(self, data: gw.GuildScheduledEventUserAdd) -> None:
guild = self._get_guild(int(data['guild_id']))
if guild is not None:
scheduled_event = guild._scheduled_events.get(int(data['guild_scheduled_event_id']))
if scheduled_event is not None:
user = self.get_user(int(data['user_id']))
if user is not None:
scheduled_event._add_user(user)
self.dispatch('scheduled_event_user_add', scheduled_event, user)
else:
_log.debug('SCHEDULED_EVENT_USER_ADD referencing unknown user ID: %s. Discarding.', data['user_id'])
else:
_log.debug(
'SCHEDULED_EVENT_USER_ADD referencing unknown scheduled event ID: %s. Discarding.',
data['guild_scheduled_event_id'],
)
else:
_log.debug('SCHEDULED_EVENT_USER_ADD referencing unknown guild ID: %s. Discarding.', data['guild_id'])
def parse_guild_scheduled_event_user_remove(self, data: gw.GuildScheduledEventUserRemove) -> None:
guild = self._get_guild(int(data['guild_id']))
if guild is not None:
scheduled_event = guild._scheduled_events.get(int(data['guild_scheduled_event_id']))
if scheduled_event is not None:
user = self.get_user(int(data['user_id']))
if user is not None:
scheduled_event._pop_user(user.id)
self.dispatch('scheduled_event_user_remove', scheduled_event, user)
else:
_log.debug('SCHEDULED_EVENT_USER_REMOVE referencing unknown user ID: %s. Discarding.', data['user_id'])
else:
_log.debug(
'SCHEDULED_EVENT_USER_REMOVE referencing unknown scheduled event ID: %s. Discarding.',
data['guild_scheduled_event_id'],
)
else:
_log.debug('SCHEDULED_EVENT_USER_REMOVE referencing unknown guild ID: %s. Discarding.', data['guild_id'])
def parse_call_create(self, data) -> None:
channel = self._get_private_channel(int(data['channel_id']))
if channel is None:
_log.debug('CALL_CREATE referencing unknown channel ID: %s. Discarding.', data['channel_id'])
return
message = self._call_message_cache.pop((int(data['message_id'])), None)
call = channel._add_call(state=self, message=message, channel=channel, **data)
self._calls[channel.id] = call
self.dispatch('call_create', call)
def parse_call_update(self, data) -> None:
call = self._calls.get(int(data['channel_id']))
if call is None:
_log.debug('CALL_UPDATE referencing unknown call (channel ID: %s). Discarding.', data['channel_id'])
return
old_call = copy.copy(call)
call._update(**data)
self.dispatch('call_update', old_call, call)
def parse_call_delete(self, data) -> None:
call = self._calls.pop(int(data['channel_id']), None)
if call is not None:
call._deleteup()
self.dispatch('call_delete', call)
def parse_voice_state_update(self, data: gw.VoiceStateUpdateEvent) -> None:
guild = self._get_guild(utils._get_as_snowflake(data, 'guild_id'))
channel_id = utils._get_as_snowflake(data, 'channel_id')
flags = self.member_cache_flags
self_id = self.self_id
if int(data['user_id']) == self_id:
voice = self._get_voice_client(guild.id if guild else self_id)
if voice is not None:
coro = voice.on_voice_state_update(data)
asyncio.create_task(logging_coroutine(coro, info='Voice Protocol voice state update handler'))
if guild is not None:
member, before, after = guild._update_voice_state(data, channel_id)
if member is not None:
if flags.voice:
if channel_id is None and flags._voice_only and member.id != self_id:
guild._remove_member(member)
elif channel_id is not None:
guild._add_member(member)
self.dispatch('voice_state_update', member, before, after)
else:
_log.debug('VOICE_STATE_UPDATE referencing an unknown member ID: %s. Discarding.', data['user_id'])
else:
user, before, after = self._update_voice_state(data, channel_id)
self.dispatch('voice_state_update', user, before, after)
def parse_voice_server_update(self, data) -> None:
key_id = utils._get_as_snowflake(data, 'guild_id')
if key_id is None:
key_id = self.self_id
vc = self._get_voice_client(key_id)
if vc is not None:
coro = vc.on_voice_server_update(data)
asyncio.create_task(logging_coroutine(coro, info='Voice Protocol voice server update handler'))
def parse_typing_start(self, data: gw.TypingStartEvent) -> None:
channel, guild = self._get_guild_channel(data)
if channel is not None:
member = None
user_id = int(data['user_id'])
if isinstance(channel, DMChannel):
member = channel.recipient
elif isinstance(channel, (Thread, TextChannel)) and guild is not None:
member = guild.get_member(user_id)
if member is None:
member_data = data.get('member')
if member_data:
member = Member(data=member_data, state=self, guild=guild)
elif isinstance(channel, GroupChannel):
member = utils.find(lambda x: x.id == user_id, channel.recipients)
if member is not None:
timestamp = datetime.datetime.fromtimestamp(data['timestamp'], tz=datetime.timezone.utc)
self.dispatch('typing', channel, member, timestamp)
def parse_relationship_add(self, data) -> None:
key = int(data['id'])
old = self.user.get_relationship(key) # type: ignore # self.user is always present here
new = Relationship(state=self, data=data)
self._relationships[key] = new
if old is not None:
self.dispatch('relationship_update', old, new)
else:
self.dispatch('relationship_add', new)
def parse_relationship_remove(self, data) -> None:
key = int(data['id'])
try:
old = self._relationships.pop(key)
except KeyError:
pass
else:
self.dispatch('relationship_remove', old)
def parse_interaction_create(self, data) -> None:
type, name, channel = self._interaction_cache.pop(data['nonce'], (0, None, None))
i = Interaction._from_self(channel, type=type, user=self.user, name=name, **data) # type: ignore # self.user is always present here
self._interactions[i.id] = i
self.dispatch('interaction', i)
def parse_interaction_success(self, data) -> None:
id = int(data['id'])
i = self._interactions.get(id, None)
if i is None:
i = Interaction(id, nonce=data['nonce'], user=self.user) # type: ignore # self.user is always present here
i.successful = True
self.dispatch('interaction_finish', i)
def parse_interaction_failed(self, data) -> None:
id = int(data['id'])
i = self._interactions.pop(id, None)
if i is None:
i = Interaction(id, nonce=data['nonce'], user=self.user) # type: ignore # self.user is always present here
i.successful = False
self.dispatch('interaction_finish', i)
def parse_interaction_modal_create(self, data) -> None:
id = int(data['id'])
interaction = self._interactions.pop(id, None)
if interaction is not None:
modal = Modal(data=data, interaction=interaction)
interaction.modal = modal
self.dispatch('modal', modal)
# Silence "unknown event" warnings for events parsed elsewhere
parse_nothing = lambda *_: None
parse_thread_member_list_update = parse_nothing # Grabbed directly in Thread.fetch_members
parse_guild_application_commands_update = parse_nothing # Grabbed directly in command iterators
def _get_reaction_user(self, channel: MessageableChannel, user_id: int) -> Optional[Union[User, Member]]:
if isinstance(channel, TextChannel):
return channel.guild.get_member(user_id)
return self.get_user(user_id)
def get_reaction_emoji(self, data: PartialEmojiPayload) -> Union[Emoji, PartialEmoji, str]:
emoji_id = utils._get_as_snowflake(data, 'id')
if not emoji_id:
# the name key will be a str
return data['name'] # type: ignore
try:
return self._emojis[emoji_id]
except KeyError:
return PartialEmoji.with_state(
self, animated=data.get('animated', False), id=emoji_id, name=data['name'] # type: ignore
)
def _upgrade_partial_emoji(self, emoji: PartialEmoji) -> Union[Emoji, PartialEmoji, str]:
emoji_id = emoji.id
if not emoji_id:
return emoji.name
try:
return self._emojis[emoji_id]
except KeyError:
return emoji
def get_channel(self, id: Optional[int]) -> Optional[Union[Channel, Thread]]:
if id is None:
return None
pm = self._get_private_channel(id)
if pm is not None:
return pm
for guild in self.guilds:
channel = guild._resolve_channel(id)
if channel is not None:
return channel
def create_message(self, *, channel: MessageableChannel, data: MessagePayload) -> Message:
return Message(state=self, channel=channel, data=data)