You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

3821 lines
156 KiB

"""
The MIT License (MIT)
Copyright (c) 2015-present Rapptz
Permission is hereby granted, free of charge, to any person obtaining a
copy of this software and associated documentation files (the "Software"),
to deal in the Software without restriction, including without limitation
the rights to use, copy, modify, merge, publish, distribute, sublicense,
and/or sell copies of the Software, and to permit persons to whom the
Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.
"""
from __future__ import annotations
import asyncio
from collections import deque, OrderedDict
import copy
import datetime
import logging
from typing import (
ClassVar,
Dict,
Final,
Optional,
TYPE_CHECKING,
Union,
Callable,
Any,
List,
TypeVar,
Coroutine,
Tuple,
Deque,
Literal,
overload,
Sequence,
Set,
)
import weakref
import inspect
from math import ceil
from discord_protos import UserSettingsType
from .errors import ClientException, InvalidData, NotFound
from .guild import Guild
from .activity import BaseActivity, create_activity, Session
from .user import User, ClientUser, Note
from .emoji import Emoji
from .mentions import AllowedMentions
from .partial_emoji import PartialEmoji
from .message import Message
from .channel import *
from .channel import _channel_factory, _private_channel_factory
from .raw_models import *
from .member import Member
from .relationship import Relationship, FriendSuggestion
from .role import Role
from .enums import (
ChannelType,
MessageType,
PaymentSourceType,
ReadStateType,
RelationshipType,
RequiredActionType,
Status,
try_enum,
)
from . import utils
from .flags import MemberCacheFlags
from .invite import Invite
from .integrations import _integration_factory
from .scheduled_event import ScheduledEvent
from .stage_instance import StageInstance
from .threads import Thread, ThreadMember
from .sticker import GuildSticker
from .settings import UserSettings, GuildSettings, ChannelSettings, TrackingSettings
from .interactions import Interaction
from .permissions import Permissions, PermissionOverwrite
from .modal import Modal
from .member import VoiceState
from .application import IntegrationApplication, PartialApplication, Achievement
from .connections import Connection
from .payments import Payment
from .entitlements import Entitlement, Gift
from .guild_premium import PremiumGuildSubscriptionSlot
from .library import LibraryApplication
from .automod import AutoModRule, AutoModAction
from .audit_logs import AuditLogEntry
from .read_state import ReadState
from .tutorial import Tutorial
from .experiment import UserExperiment, GuildExperiment
from .metadata import Metadata
if TYPE_CHECKING:
from typing_extensions import Self
from .abc import Snowflake as abcSnowflake
from .activity import ActivityTypes
from .message import MessageableChannel
from .guild import GuildChannel
from .http import HTTPClient
from .voice_client import VoiceProtocol
from .client import Client
from .gateway import DiscordWebSocket
from .calls import Call
from .poll import Poll
from .types.automod import AutoModerationRule, AutoModerationActionExecution
from .types.snowflake import Snowflake
from .types.activity import Activity as ActivityPayload
from .types.application import (
Achievement as AchievementPayload,
IntegrationApplication as IntegrationApplicationPayload,
)
from .types.channel import DMChannel as DMChannelPayload
from .types.user import User as UserPayload, PartialUser as PartialUserPayload
from .types.emoji import Emoji as EmojiPayload, PartialEmoji as PartialEmojiPayload
from .types.sticker import GuildSticker as GuildStickerPayload
from .types.guild import BaseGuild as BaseGuildPayload, Guild as GuildPayload
from .types.message import (
Message as MessagePayload,
MessageSearchResult as MessageSearchResultPayload,
PartialMessage as PartialMessagePayload,
)
from .types import gateway as gw
from .types.voice import BaseVoiceState as VoiceStatePayload
from .types.activity import ClientStatus as ClientStatusPayload
T = TypeVar('T')
PrivateChannel = Union[DMChannel, GroupChannel]
Channel = Union[GuildChannel, PrivateChannel, PartialMessageable]
MISSING = utils.MISSING
_log = logging.getLogger(__name__)
class ChunkRequest:
__slots__ = (
'guild_id',
'resolver',
'loop',
'limit',
'remaining',
'cache',
'oneshot',
'nonce',
'buffer',
'last_buffer',
'waiters',
)
def __init__(
self,
guild_id: int,
loop: asyncio.AbstractEventLoop,
resolver: Callable[[int], Any],
*,
limit: Optional[int] = None,
cache: bool = True,
oneshot: bool = True,
nonce: Optional[str] = None,
) -> None:
self.guild_id: int = guild_id
self.resolver: Callable[[int], Any] = resolver
self.loop: asyncio.AbstractEventLoop = loop
self.limit: Optional[int] = limit
self.remaining: int = limit or 0
self.cache: bool = cache
self.oneshot: bool = oneshot
self.nonce: str = nonce or utils._generate_nonce()
self.buffer: List[Member] = []
self.last_buffer: Optional[List[Member]] = None
self.waiters: List[asyncio.Future[List[Member]]] = []
def add_members(self, members: List[Member]) -> None:
unique_members = set(members)
if self.limit is not None:
if self.remaining <= 0:
return
members = list(unique_members)[: self.remaining]
self.remaining -= len(unique_members)
else:
members = list(unique_members)
self.buffer.extend(members)
if self.cache:
guild = self.resolver(self.guild_id)
if guild is None:
return
for member in members:
guild._add_member(member)
if not self.oneshot:
self.last_buffer = members
async def wait(self) -> List[Member]:
future = self.loop.create_future()
self.waiters.append(future)
try:
return await future
finally:
self.waiters.remove(future)
def get_future(self) -> asyncio.Future[List[Member]]:
future = self.loop.create_future()
self.waiters.append(future)
return future
def done(self) -> None:
result = self.buffer if self.oneshot else self.last_buffer or self.buffer
for future in self.waiters:
if not future.done():
future.set_result(result)
class MemberSidebar:
__slots__ = (
'guild',
'channels',
'chunk',
'delay',
'cache',
'loop',
'safe_override',
'_limit_override',
'ranges',
'subscribing',
'buffer',
'exception',
'waiters',
)
def __init__(
self,
guild: Guild,
channels: List[abcSnowflake],
*,
chunk: bool,
delay: Union[int, float],
cache: bool,
loop: asyncio.AbstractEventLoop,
) -> None:
self.guild = guild
self.cache = cache
self.chunk = chunk
self.delay = delay
self.loop = loop
self.safe_override = False # >.<
self._limit_override: Optional[int] = None
self.channels = [str(channel.id) for channel in (channels or self.get_channels(1 if chunk else 5))]
self.ranges = self.get_ranges()
self.subscribing: bool = False
self.buffer: List[Member] = []
self.exception: Optional[Exception] = None
self.waiters: List[asyncio.Future[Optional[List[Member]]]] = []
@property
def manual_override(self) -> bool:
return self._limit_override is not None
@property
def limit(self) -> int:
if self._limit_override is not None:
return self._limit_override
guild = self.guild
members = guild._presence_count if guild._offline_members_hidden else guild._member_count or 0
# Ensure groups are accounted for
return (members or 0) + len([role for role in guild.roles if role.hoist]) + 2
@property
def state(self) -> ConnectionState:
return self.guild._state
@property
def safe(self):
return self.safe_override or (self.guild._member_count or 0) >= 75000
@staticmethod
def amalgamate(original: Tuple[int, int], value: Tuple[int, int]) -> Tuple[int, int]:
return original[0], value[1] - 99
def get_ranges(self, *, start: int = 0) -> List[Tuple[int, int]]:
chunk = 100
end = 99
amount = self.limit
if amount is None:
raise RuntimeError('Member/presence count required to compute ranges')
ceiling = ceil(amount / chunk) * chunk
ranges = []
for i in range(int(start / chunk), int(ceiling / chunk)):
min = i * chunk
max = min + end
ranges.append((min, max))
return ranges
def get_current_ranges(self) -> List[Tuple[int, int]]:
ranges = self.ranges
ret = []
for _ in range(3):
if self.safe:
try:
ret.append(ranges.pop(0))
except IndexError:
break
else:
try:
current = ranges.pop(0)
except IndexError:
break
for _ in range(3):
try:
current = self.amalgamate(current, ranges.pop(0))
except IndexError:
break
ret.append(current)
return ret
def handle_manual_override(self, group_members: int) -> None:
# Certain guilds like MidJourney have their member list groups manually set
# In these cases, the online group is removed, and most online members are not retrievable
# We must update the limit to the "real" online count, and recalculate the ranges
self._limit_override = group_members
if self.ranges:
self.ranges = self.get_ranges(start=self.ranges[0][0])
def get_channels(self, amount: int) -> List[abcSnowflake]:
guild = self.guild
ret = set()
channels = [
channel
for channel in self.guild.channels
if channel.permissions_for(guild.default_role).read_messages # "everyone" id
and channel.permissions_for(guild.me).read_messages # type: ignore
]
if guild.rules_channel is not None: # micro-optimization
channels.insert(0, guild.rules_channel)
while len(ret) < amount and channels:
channel = channels.pop()
for role in guild.roles:
if not channel.permissions_for(role).read_messages:
break
else:
for ow in channel._overwrites:
if ow.is_member():
allow = Permissions(ow.allow)
deny = Permissions(ow.deny)
overwrite = PermissionOverwrite.from_pair(allow, deny)
if not overwrite.read_messages:
break
ret.add(channel)
return list(ret)
def add_members(self, members: List[Member]) -> None:
members = list(set(members))
self.buffer.extend(members)
if self.cache:
guild = self.guild
for member in members:
guild._add_member(member)
async def wait(self) -> List[Member]:
future = self.loop.create_future()
self.waiters.append(future)
try:
return await future
finally:
self.waiters.remove(future)
def get_future(self) -> asyncio.Future[List[Member]]:
future = self.loop.create_future()
self.waiters.append(future)
return future
def done(self) -> None:
for future in self.waiters:
if not future.done():
if self.exception:
future.set_exception(self.exception)
else:
future.set_result(self.buffer)
try:
del self.state._scrape_requests[self.guild.id]
except KeyError:
pass
def start(self):
self.loop.create_task(self.wrapper())
async def wrapper(self):
try:
await self.scrape()
except asyncio.CancelledError:
pass
except Exception as exc:
_log.warning('Member list scraping failed for guild ID %s (%s).', self.guild.id, exc)
self.exception = exc
finally:
self.done()
async def scrape(self):
self.subscribing = True
delay = self.delay
channels = self.channels
guild = self.guild
state = guild._state
while self.subscribing:
requests = {}
for channel in channels:
ranges = self.get_current_ranges()
if not ranges:
self.subscribing = False
break
requests[channel] = ranges
if not self.subscribing and not requests:
break
request_values = list(requests.values())
all_possible_ranges = range(request_values[0][0][0], request_values[-1][-1][1] + 1)
if not requests:
raise ClientException('Failed to automatically choose channels; please specify them manually')
def predicate(data):
return int(data['guild_id']) == guild.id and any(
op['op'] == 'SYNC' and op['range'][0] in all_possible_ranges for op in data['ops']
)
await state.subscriptions.subscribe_to_channels(guild, requests, replace=True)
try:
await asyncio.wait_for(
state.ws.wait_for('GUILD_MEMBER_LIST_UPDATE', predicate), timeout=state._chunk_timeout(guild)
)
except asyncio.TimeoutError:
r = tuple(requests.values())[-1][-1]
if self.limit in range(r[0], r[1]) or self.limit < r[1]:
self.subscribing = False
break
else:
if self.safe:
raise InvalidData('Did not receive a response from Discord')
# Sometimes servers require safe mode (they used to have 75k+ members)
# so if we don't get a response we force safe mode and try again
_log.debug(
'Forcing member list scraping safe mode for guild ID %s (member count: %s).',
guild.id,
guild._member_count,
)
self.safe_override = True
self.ranges = self.get_ranges()
await self.scrape()
return
await asyncio.sleep(delay)
r = tuple(requests.values())[-1][-1]
if self.limit in range(r[0], r[1]) or self.limit < r[1]:
self.subscribing = False
break
state.loop.create_task(state.subscriptions.subscribe_to_channels(guild, {}, replace=True))
class GuildSubscriptions:
"""A bit of documentation on guild subscriptions:
Client can subscribe to the following "features":
- ``typing``: whether the client receives TYPING_START events
- ``threads``: whether the client receives thread events for threads the user is not in -- probably wrong
- The client immediately receives a THREAD_LIST_SYNC event with all the threads in the guild
- ``activities``: not sure what this does yet
- ``member_updates``: whether the client receives member events (GUILD_MEMBER_ADD, GUILD_MEMBER_UPDATE, GUILD_MEMBER_REMOVE)
The client is automatically subscribed to all guilds with < 75k members on connect. For guilds the client is not subscribed to,
it will not receive non-stateful events (e.g. MESSAGE_CREATE, MESSAGE_UPDATE, MESSAGE_DELETE, etc.).
Additionally, it will receive the PASSIVE_UPDATE_V2 event to keep voice states and channel unreads up-to-date.
Once a client subscribes to the ``typing`` feature, it is considered subscribed to that guild indefinitely. On the first subscribe,
it will receive a GUILD_CREATE event (for some reason). If subscribing to other features/members without having subscribed to the
guild, they will have no effect until the client subscribes to it. This comes with the drawback of not receiving a THREAD_LIST_SYNC
event if the client is not subscribed to the guild, even if it subscribes to it later. It must first unsubscribe from the ``threads``
event and then resubscribe to it.
Clients can also subscribe to specific members within a guild using the ``members`` parameter. This is a list of user IDs that
the client will receive GUILD_MEMBER_ & PRESENCE_ UPDATEs for. This field has no cap except for the max payload size.
Clients can also subscribe to specific threads' member lists using the ``thread_member_lists`` parameter. After subscribing to a thread
member list, the client will immediately receive a THREAD_MEMBER_LIST_UPDATE event (sent regardless of whether the client is subscribed
to the guild) with all the members in the thread. Note that the members received are guild member objects, not thread member objects.
There is no way for user accounts to request thread member objects. After subscribing to a thread member list, the client will be
subscribed to all members in the thread, akin to using ``members``. This field has no cap except for the max payload size.
The client is automatically subscribed to the members of all friends, implicit relationships, and users with open DMs (that it has a mutual guild with)
for all guilds with less than 75k members at startup. Additionally, the Gateway will repeat this process and subscribe you to a specific DM recipient every time a new DM is opened.
The above triggers a GUILD_MEMBER_REMOVE event for every guild you do not share with the DM recipient (this behavior used to exist for the ``members`` parameter too).
Irrespective of subscribed members, clients will always receive GUILD_MEMBER_UPDATE and GUILD_MEMBER_REMOVE events for actions they perform
(e.g. changing a user's nickname, kicking a user, banning a user, etc.). Additionally, you will also receive GUILD_MEMBER_UPDATE for members
with an active voice state. This requires that the client is subscribed to the guild.
Remember that you get presence updates for the overall user presence for all friends and implicit relationships.
# TODO: there's some weird behavior here that needs to be further investigated:
a) joining new guilds
b) adding new friends - I believe you are NOT automatically subscribed to them in every guild (however you still get the overall user presence)
c) opening new DMs - I'm almost sure this just universally subscribes you
d) 75k member+ guilds are fucking weird
"""
# I thought it was 4096 bytes, but this is taken from the client
MAX_PAYLOAD_SIZE: Final[int] = 15360 # 15 KiB
TICK_TIMEOUT: Final[float] = 0.5
__slots__ = (
'_state',
'_pending',
'_task',
'_blocked',
'_subscribed',
'_typing',
'_threads',
'_activities',
'_member_updates',
'_members',
'_thread_member_lists',
'_channels',
)
def __init__(self, state: ConnectionState, /) -> None:
self._state = state
self._pending: gw.BulkGuildSubscribePayload = {}
self._task: Optional[asyncio.Task[None]] = None
self._blocked: bool = False
self.clear()
def clear(self) -> None:
# Feature subscriptions
self._subscribed: utils.SnowflakeList = utils.SnowflakeList()
self._typing: utils.SnowflakeList = utils.SnowflakeList()
self._threads: utils.SnowflakeList = utils.SnowflakeList()
self._activities: utils.SnowflakeList = utils.SnowflakeList() # TODO: wtf does this do
self._member_updates: utils.SnowflakeList = utils.SnowflakeList()
# Member subscriptions
self._members: Dict[int, utils.SnowflakeList] = {}
self._thread_member_lists: Dict[int, utils.SnowflakeList] = {}
self._channels: Dict[int, Dict[int, List[Tuple[int, int]]]] = {}
def _initial_update(self, guilds: List[Guild]):
# The client is subscribed to all guilds with < 75k members on connect
# This function is currently unused because I don't want to rely on this behavior
for guild in guilds:
if guild._member_count and guild._member_count < 75000:
self._subscribed.add(guild.id)
def _cancel(self) -> None:
if self._task:
self._task.cancel()
self._task = None
async def _tick_task(self) -> None:
try:
await asyncio.sleep(self.TICK_TIMEOUT)
await self._flush()
self._task = None
except asyncio.CancelledError:
pass
def _tick(self) -> None:
self._cancel()
if not self._blocked:
self._task = self._state.loop.create_task(self._tick_task())
@property
def empty(self) -> bool:
return not self._subscribed and not self._pending
@property
def blocked(self) -> bool:
return self._blocked
@blocked.setter
def blocked(self, value: bool, /) -> None:
if self._blocked == value:
return
self._blocked = value
if value:
self._cancel()
else:
self._tick()
async def _requeue_subscriptions(self):
# We need to send all our subscriptions again if we reconnect
pending = self._pending
subscribed = self._subscribed
typing = self._typing
threads = self._threads
activities = self._activities
member_updates = self._member_updates
members = self._members
thread_member_lists = self._thread_member_lists
channels = self._channels
self.clear()
for guild_id in self._state._guilds:
if guild_id not in subscribed:
continue
key = str(guild_id)
payload: gw.BaseGuildSubscribePayload = {
# Ensure we are subscribed to the guild
'typing': guild_id in typing or guild_id in subscribed,
'threads': guild_id in threads,
'activities': guild_id in activities,
'member_updates': guild_id in member_updates,
'members': list(members.get(guild_id, ())),
'thread_member_lists': list(thread_member_lists.get(guild_id, ())),
'channels': channels.get(guild_id, {}), # type: ignore
}
if key in pending:
payload.update(pending[key])
if payload:
await self._checked_add({key: payload})
def is_subscribed(self, guild: abcSnowflake, /) -> bool:
return guild.id in self._subscribed
def _is_pending_subscribe(self, guild_id: int, /) -> bool:
key = str(guild_id)
return guild_id in self._subscribed or (key in self._pending and self._pending[key].get('typing') is True)
def has_feature(
self, guild: abcSnowflake, feature: Literal['typing', 'threads', 'activities', 'member_updates'], /
) -> bool:
# getattr is less performant
if feature == 'typing':
return self._typing.has(guild.id)
elif feature == 'threads':
return self._threads.has(guild.id)
elif feature == 'activities':
return self._activities.has(guild.id)
elif feature == 'member_updates':
return self._member_updates.has(guild.id)
def members_for(self, guild: abcSnowflake, /) -> Sequence[int]:
return utils.SequenceProxy(self._members.get(guild.id, ()))
def threads_for(self, guild: abcSnowflake, /) -> Sequence[int]:
return utils.SequenceProxy(self._thread_member_lists.get(guild.id, ()))
def channels_for(self, guild: abcSnowflake, /) -> Dict[int, List[Tuple[int, int]]]:
return self._channels.get(guild.id, {}).copy()
async def _checked_add(self, changes: gw.BulkGuildSubscribePayload, /) -> None:
# n.b. changes should have a single key
# We need to check if the new payload is larger than MAX_PAYLOAD_SIZE bytes
# If it is, we need to flush the old payload and start a new one
# If there isn't an old payload and the new payload is larger, this is impossible
EMPTY: Any = {}
new_payload = self._pending.copy()
for guild_id, subscriptions in changes.items():
old = new_payload.get(guild_id, EMPTY)
new_payload[guild_id] = {**old, **subscriptions} # type: ignore # ???
if len(utils._to_json(new_payload)) > self.MAX_PAYLOAD_SIZE:
if len(utils._to_json(changes)) > self.MAX_PAYLOAD_SIZE:
raise ValueError('Guild subscription payload too large to send')
await self._flush()
self._pending = changes
else:
self._pending = new_payload
self._tick()
async def _flush(self) -> None:
payload = self._pending
if not payload:
return
# Only keys that are present in the payload are updated on the backend
await self._state.ws.bulk_guild_subscribe(payload)
self._pending = {}
for key, subscriptions in payload.items():
guild_id = int(key)
if subscriptions.get('typing'):
self._subscribed.add(guild_id)
for attr, value in subscriptions.items():
if isinstance(value, bool):
record = getattr(self, f'_{attr}')
if value:
record.add(guild_id)
else:
record.discard(guild_id)
elif attr == 'channels':
record = self._channels
if value:
record[guild_id] = value # type: ignore
else:
record.pop(guild_id, None)
else:
record = getattr(self, f'_{attr}')
if value:
record[guild_id] = utils.SnowflakeList(value) # type: ignore
else:
record.pop(guild_id, None)
async def send(self) -> None:
should_tick = self._task is not None
await self._flush()
if should_tick:
self._tick()
async def subscribe_to(
self,
guild: abcSnowflake,
/,
*,
typing: bool = MISSING,
threads: bool = MISSING,
activities: bool = MISSING,
member_updates: bool = MISSING,
):
# Sanity check
if not self._is_pending_subscribe(guild.id):
if typing is MISSING:
typing = True
if not typing:
raise TypeError('Cannot subscribe to guild without subscribing to typing')
payload: gw.BaseGuildSubscribePayload = {}
if typing is not MISSING:
payload['typing'] = typing
if threads is not MISSING:
payload['threads'] = threads
if activities is not MISSING:
payload['activities'] = activities
if member_updates is not MISSING:
payload['member_updates'] = member_updates
if payload:
await self._checked_add({str(guild.id): payload})
async def subscribe_to_members(self, guild: abcSnowflake, /, *members: abcSnowflake, replace: bool = False) -> None:
if not replace and not members:
return
# Sanity check
if not self._is_pending_subscribe(guild.id):
raise TypeError('Cannot subscribe to guild without subscribing to typing')
payload: gw.BaseGuildSubscribePayload = {}
values: Set[Snowflake] = {member.id for member in members}
if not replace:
existing = self._members.get(guild.id)
if existing:
values.update(existing)
payload['members'] = list(values)
await self._checked_add({str(guild.id): payload})
async def unsubscribe_from_members(self, guild: abcSnowflake, /, *members: abcSnowflake) -> None:
# Sanity check
if not self._is_pending_subscribe(guild.id):
return
payload: gw.BaseGuildSubscribePayload = {}
existing = self._members.get(guild.id)
to_remove = [member.id for member in members]
if existing:
payload['members'] = [member for member in existing if member not in to_remove]
await self._checked_add({str(guild.id): payload})
async def subscribe_to_threads(self, guild: abcSnowflake, /, *threads: abcSnowflake, replace: bool = False) -> None:
if not replace and not threads:
return
# Sanity check
if not self._is_pending_subscribe(guild.id):
raise TypeError('Cannot subscribe to guild without subscribing to typing')
payload: gw.BaseGuildSubscribePayload = {}
values: set[Snowflake] = {thread.id for thread in threads}
if not replace:
existing = self._thread_member_lists.get(guild.id)
if existing:
values.update(existing)
payload['thread_member_lists'] = list(values)
await self._checked_add({str(guild.id): payload})
async def unsubscribe_from_threads(self, guild: abcSnowflake, /, *threads: abcSnowflake) -> None:
# Sanity check
if not self._is_pending_subscribe(guild.id):
return
payload: gw.BaseGuildSubscribePayload = {}
existing = self._thread_member_lists.get(guild.id)
to_remove = [thread.id for thread in threads]
if existing:
payload['thread_member_lists'] = [thread for thread in existing if thread not in to_remove]
await self._checked_add({str(guild.id): payload})
async def subscribe_to_channels(
self, guild: abcSnowflake, /, channels: Dict[Snowflake, List[Tuple[int, int]]], replace: bool = False
) -> None:
if not replace and not channels:
return
# Sanity check
if not self._is_pending_subscribe(guild.id):
raise TypeError('Cannot subscribe to guild without subscribing to typing')
payload: gw.BaseGuildSubscribePayload = {}
values = channels.copy()
if not replace:
existing = self._channels.get(guild.id)
if existing:
values = {**existing, **channels}
for channel_id, ranges in channels.items():
values[channel_id] = ranges
payload['channels'] = values
await self._checked_add({str(guild.id): payload})
class ClientStatus:
__slots__ = ('status', 'desktop', 'mobile', 'web')
def __init__(self, status: Optional[str] = None, data: Optional[ClientStatusPayload] = None, /) -> None:
self.status: str = 'offline'
self.desktop: Optional[str] = None
self.mobile: Optional[str] = None
self.web: Optional[str] = None
if status is not None or data is not None:
self._update(status or 'offline', data or {})
def __repr__(self) -> str:
attrs = [
('status', self.status),
('desktop', self.desktop),
('mobile', self.mobile),
('web', self.web),
]
inner = ' '.join('%s=%r' % t for t in attrs)
return f'<{self.__class__.__name__} {inner}>'
def _update(self, status: str, data: ClientStatusPayload, /) -> None:
self.status = status
self.desktop = data.get('desktop')
self.mobile = data.get('mobile')
self.web = data.get('web')
@classmethod
def _copy(cls, client_status: Self, /) -> Self:
self = cls.__new__(cls) # bypass __init__
self.status = client_status.status
self.desktop = client_status.desktop
self.mobile = client_status.mobile
self.web = client_status.web
return self
class Presence:
__slots__ = ('client_status', 'activities')
_OFFLINE: ClassVar[Self] = MISSING
def __init__(self, data: gw.BasePresenceUpdate, state: ConnectionState, /) -> None:
self.client_status: ClientStatus = ClientStatus(data['status'], data.get('client_status'))
self.activities: Tuple[ActivityTypes, ...] = tuple(create_activity(d, state) for d in data['activities'])
def __repr__(self) -> str:
attrs = [
('client_status', self.client_status),
('activities', self.activities),
]
inner = ' '.join('%s=%r' % t for t in attrs)
return f'<{self.__class__.__name__} {inner}>'
def __eq__(self, other: Any) -> bool:
if not isinstance(other, Presence):
return False
return self.client_status == other.client_status and self.activities == other.activities
def __ne__(self, other: Any) -> bool:
if not isinstance(other, Presence):
return True
return self.client_status != other.client_status or self.activities != other.activities
def _update(self, data: gw.BasePresenceUpdate, state: ConnectionState, /) -> None:
self.client_status._update(data['status'], data.get('client_status'))
self.activities = tuple(create_activity(d, state) for d in data['activities'])
@classmethod
def _offline(cls) -> Self:
if cls._OFFLINE is MISSING:
self = cls.__new__(cls) # bypass __init__
self.client_status = ClientStatus()
self.activities = ()
cls._OFFLINE = self
return cls._OFFLINE
@classmethod
def _copy(cls, presence: Self, /) -> Self:
self = cls.__new__(cls) # bypass __init__
self.client_status = ClientStatus._copy(presence.client_status)
self.activities = presence.activities
return self
class FakeClientPresence(Presence):
__slots__ = ('_state',)
def __init__(self, state: ConnectionState, /) -> None:
self._state = state
def _update(self, data: gw.PresenceUpdateEvent, state: ConnectionState, /) -> None:
return
@property
def client_status(self) -> ClientStatus:
state = self._state
status = str(getattr(state.current_session, 'status', 'offline'))
client_status = {str(session.client): str(session.status) for session in state._sessions.values()}
return ClientStatus(status, client_status) # type: ignore
@property
def activities(self) -> Tuple[ActivityTypes, ...]:
return getattr(self._state.current_session, 'activities', ())
async def logging_coroutine(coroutine: Coroutine[Any, Any, T], *, info: str) -> Optional[T]:
try:
await coroutine
except Exception:
_log.exception('Exception occurred during %s.', info)
class ConnectionState:
def __init__(
self,
*,
dispatch: Callable[..., Any],
handlers: Dict[str, Callable[..., Any]],
hooks: Dict[str, Callable[..., Coroutine[Any, Any, Any]]],
http: HTTPClient,
client: Client,
**options: Any,
) -> None:
# Set later, after Client.login
self.loop: asyncio.AbstractEventLoop = utils.MISSING
self.http: HTTPClient = http
self.client = client
self.max_messages: Optional[int] = options.get('max_messages', 1000)
if self.max_messages is not None and self.max_messages <= 0:
self.max_messages = 1000
self.dispatch: Callable[..., Any] = dispatch
self.handlers: Dict[str, Callable[..., Any]] = handlers
self.hooks: Dict[str, Callable[..., Coroutine[Any, Any, Any]]] = hooks
self._ready_task: Optional[asyncio.Task] = None
self.heartbeat_timeout: float = options.get('heartbeat_timeout', 60.0)
allowed_mentions = options.get('allowed_mentions')
if allowed_mentions is not None and not isinstance(allowed_mentions, AllowedMentions):
raise TypeError('allowed_mentions parameter must be AllowedMentions')
self.allowed_mentions: Optional[AllowedMentions] = allowed_mentions
self._chunk_requests: Dict[Union[str, int], ChunkRequest] = {}
self._scrape_requests: Dict[Union[str, int], MemberSidebar] = {}
activities = options.get('activities', [])
if not activities:
activity = options.get('activity')
if activity is not None:
activities = [activity]
if not all(isinstance(activity, BaseActivity) for activity in activities):
raise TypeError('activity parameter must derive from BaseActivity')
activities = [activity.to_dict() for activity in activities]
status = options.get('status', None)
if status:
if status is Status.offline:
status = 'invisible'
else:
status = str(status)
idle_since = options.get('idle_since', None)
if idle_since:
if not isinstance(idle_since, datetime.datetime):
raise TypeError('idle_since parameter must be a datetime.datetime')
since = int(idle_since.timestamp() * 1000)
else:
since = 0
chunk_guilds = options.get('chunk_guilds_at_startup', MISSING)
subscribe_guilds = options.get('guild_subscriptions', options.get('request_guilds', True))
cache_flags = options.get('member_cache_flags', None)
if cache_flags is None:
cache_flags = MemberCacheFlags.all()
else:
if not isinstance(cache_flags, MemberCacheFlags):
raise TypeError(f'member_cache_flags parameter must be MemberCacheFlags not {type(cache_flags)!r}')
if not cache_flags.joined and chunk_guilds:
raise ClientException('Cannot chunk guilds at startup without a member cache')
if chunk_guilds and not subscribe_guilds:
raise ClientException('Cannot chunk guilds at startup without subscribing to them')
self._chunk_guilds: bool = chunk_guilds if chunk_guilds is not MISSING else (subscribe_guilds and cache_flags.joined)
self._subscribe_guilds: bool = subscribe_guilds
self.member_cache_flags: MemberCacheFlags = cache_flags
self._activities: List[ActivityPayload] = activities
self._status: Optional[str] = status
self._afk: bool = options.get('afk', False)
self._idle_since: int = since
self.overriden_rtc_regions: Optional[List[str]] = options.get('preferred_rtc_regions', None)
if cache_flags._empty:
self.store_user = self.create_user
self.parsers: Dict[str, Callable[[Any], None]]
self.parsers = parsers = {}
for attr, func in inspect.getmembers(self):
if attr.startswith('parse_'):
parsers[attr[6:].upper()] = func
self.clear(full=True)
def clear(self, *, full: bool = False) -> None:
self.user: Optional[ClientUser] = None
self._users: weakref.WeakValueDictionary[int, User] = weakref.WeakValueDictionary()
self.settings: Optional[UserSettings] = None
self.consents: Optional[TrackingSettings] = None
self.connections: Dict[str, Connection] = {}
self.pending_payments: Dict[int, Payment] = {}
self.analytics_token: Optional[str] = None
self.preferred_rtc_regions: List[str] = []
self.country_code: Optional[str] = None
self.api_code_version: int = 0
self.session_type: Optional[str] = None
self.auth_session_id: Optional[str] = None
self.required_action: Optional[RequiredActionType] = None
self.friend_suggestion_count: int = 0
self.disclose: List[str] = []
self._emojis: Dict[int, Emoji] = {}
self._stickers: Dict[int, GuildSticker] = {}
self._guilds: Dict[int, Guild] = {}
self.tutorial: Tutorial = Tutorial.default(self)
self._read_states: Dict[int, Dict[int, ReadState]] = {}
self.read_state_version: int = 0
self.guild_settings: Dict[Optional[int], GuildSettings] = {}
self.guild_settings_version: int = 0
self._calls: Dict[int, Call] = {}
self._call_message_cache: Dict[int, Message] = {} # Hopefully this won't be a memory leak
self._voice_clients: Dict[int, VoiceProtocol] = {}
self._voice_states: Dict[int, VoiceState] = {}
self._interaction_cache: Dict[Union[int, str], Tuple[int, Optional[str], MessageableChannel]] = {}
self._interactions: OrderedDict[Union[int, str], Interaction] = OrderedDict() # LRU of max size 15
self._relationships: Dict[int, Relationship] = {}
self._private_channels: Dict[int, PrivateChannel] = {}
self._private_channels_by_user: Dict[int, DMChannel] = {}
self._guild_presences: Dict[int, Dict[int, Presence]] = {}
self._presences: Dict[int, Presence] = {}
self._sessions: Dict[str, Session] = {}
if self.max_messages is not None:
self._messages: Optional[Deque[Message]] = deque(maxlen=self.max_messages)
else:
self._messages: Optional[Deque[Message]] = None
self.experiments: Dict[int, UserExperiment] = {}
self.guild_experiments: Dict[int, GuildExperiment] = {}
if full:
self.subscriptions: GuildSubscriptions = GuildSubscriptions(self)
def process_chunk_requests(self, guild_id: int, nonce: Optional[str], members: List[Member], complete: bool) -> None:
removed = []
for key, request in self._chunk_requests.items():
if request.guild_id == guild_id and request.nonce == nonce:
request.add_members(members)
if complete:
request.done()
if request.oneshot:
removed.append(key)
for key in removed:
del self._chunk_requests[key]
def call_handlers(self, key: str, *args: Any, **kwargs: Any) -> None:
try:
func = self.handlers[key]
except KeyError:
pass
else:
func(*args, **kwargs)
async def call_hooks(self, key: str, *args: Any, **kwargs: Any) -> None:
try:
coro = self.hooks[key]
except KeyError:
pass
else:
await coro(*args, **kwargs)
async def async_setup(self) -> None:
pass
@property
def session_id(self) -> Optional[str]:
if self.ws:
return self.ws.session_id
@property
def ws(self):
return self.client.ws
@property
def self_id(self) -> Optional[int]:
u = self.user
return u.id if u else None
@property
def locale(self) -> str:
return str(getattr(self.user, 'locale', 'en-US'))
@property
def voice_clients(self) -> List[VoiceProtocol]:
return list(self._voice_clients.values())
def _update_voice_state(
self, data: VoiceStatePayload, channel_id: Optional[int]
) -> Tuple[Optional[User], VoiceState, VoiceState]:
user_id = int(data['user_id'])
user = self.get_user(user_id)
channel: Optional[Union[DMChannel, GroupChannel]] = self._get_private_channel(channel_id)
try:
# Check if we should remove the voice state from cache
if channel is None:
after = self._voice_states.pop(user_id)
else:
after = self._voice_states[user_id]
before = copy.copy(after)
after._update(data, channel)
except KeyError:
# if we're here then add it into the cache
after = VoiceState(data=data, channel=channel)
before = VoiceState(data=data, channel=None)
self._voice_states[user_id] = after
return user, before, after
def _voice_state_for(self, user_id: int) -> Optional[VoiceState]:
return self._voice_states.get(user_id)
def _get_voice_client(self, guild_id: Optional[int]) -> Optional[VoiceProtocol]:
# The keys of self._voice_clients are ints
return self._voice_clients.get(guild_id) # type: ignore
def _add_voice_client(self, guild_id: int, voice: VoiceProtocol) -> None:
self._voice_clients[guild_id] = voice
def _remove_voice_client(self, guild_id: int) -> None:
self._voice_clients.pop(guild_id, None)
def _get_preferred_regions(self) -> Dict[str, Union[List[str], str]]:
regions = self.overriden_rtc_regions if self.overriden_rtc_regions is not None else self.client.preferred_rtc_regions
if regions:
return {'preferred_regions': regions, 'preferred_region': regions[0]}
return {}
def _update_references(self, ws: DiscordWebSocket) -> None:
for vc in self.voice_clients:
vc.main_ws = ws # type: ignore # Silencing the unknown attribute (ok at runtime).
def _add_interaction(self, interaction: Interaction) -> None:
self._interactions[interaction.id] = interaction
if len(self._interactions) > 15:
self._interactions.popitem(last=False)
def store_user(self, data: Union[UserPayload, PartialUserPayload], *, cache: bool = True) -> User:
# this way is 300% faster than `dict.setdefault`.
user_id = int(data['id'])
try:
return self._users[user_id]
except KeyError:
user = User(state=self, data=data)
if cache:
self._users[user_id] = user
return user
def create_user(self, data: Union[UserPayload, PartialUserPayload], cache: bool = False) -> User:
user_id = int(data['id'])
if user_id == self.self_id:
return self.user # type: ignore
return User(state=self, data=data)
def get_user(self, id: int) -> Optional[User]:
return self._users.get(id)
def store_emoji(self, guild: Guild, data: EmojiPayload) -> Emoji:
# The id will be present here
emoji_id = int(data['id']) # type: ignore
emoji = Emoji(guild=guild, state=self, data=data)
if not self.is_guild_evicted(guild):
self._emojis[emoji_id] = emoji
return emoji
def store_sticker(self, guild: Guild, data: GuildStickerPayload) -> GuildSticker:
sticker_id = int(data['id'])
sticker = GuildSticker(state=self, data=data)
if not self.is_guild_evicted(guild):
self._stickers[sticker_id] = sticker
return sticker
@property
def guilds(self) -> Sequence[Guild]:
return utils.SequenceProxy(self._guilds.values())
def _get_guild(self, guild_id: Optional[int], /) -> Optional[Guild]:
# The keys of self._guilds are ints
return self._guilds.get(guild_id) # type: ignore
def _get_or_create_unavailable_guild(self, guild_id: int, /) -> Guild:
return self._guilds.get(guild_id) or Guild._create_unavailable(state=self, guild_id=guild_id)
def _add_guild(self, guild: Guild, /) -> None:
self._guilds[guild.id] = guild
def _remove_guild(self, guild: Guild, /) -> None:
self._guilds.pop(guild.id, None)
self._guild_presences.pop(guild.id, None)
# Nuke all read states
for state_type in (ReadStateType.scheduled_events, ReadStateType.guild_home, ReadStateType.onboarding):
read_state = self.get_read_state(guild.id, state_type, if_exists=True)
if read_state is not None:
self.remove_read_state(read_state)
# Nuke guild expressions
for emoji in guild.emojis:
self._emojis.pop(emoji.id, None)
for sticker in guild.stickers:
self._stickers.pop(sticker.id, None)
del guild
def create_guild(self, guild: BaseGuildPayload, /) -> Guild:
return Guild(data=guild, state=self)
@property
def emojis(self) -> Sequence[Emoji]:
return utils.SequenceProxy(self._emojis.values())
@property
def stickers(self) -> Sequence[GuildSticker]:
return utils.SequenceProxy(self._stickers.values())
def get_emoji(self, emoji_id: Optional[int]) -> Optional[Emoji]:
# the keys of self._emojis are ints
return self._emojis.get(emoji_id) # type: ignore
def get_sticker(self, sticker_id: Optional[int]) -> Optional[GuildSticker]:
# the keys of self._stickers are ints
return self._stickers.get(sticker_id) # type: ignore
@property
def private_channels(self) -> Sequence[PrivateChannel]:
return utils.SequenceProxy(self._private_channels.values())
async def call_connect(self, channel_id: int) -> None:
# This is now no longer needed with the AUTO_CALL_CONNECT capability
if self.ws is None:
return
await self.ws.call_connect(channel_id)
def _get_private_channel(self, channel_id: Optional[int]) -> Optional[PrivateChannel]:
# The keys of self._private_channels are ints
return self._private_channels.get(channel_id) # type: ignore
def _get_private_channel_by_user(self, user_id: Optional[int]) -> Optional[DMChannel]:
# The keys of self._private_channels are ints
return self._private_channels_by_user.get(user_id) # type: ignore
def _add_private_channel(self, channel: PrivateChannel) -> None:
channel_id = channel.id
self._private_channels[channel_id] = channel
if isinstance(channel, DMChannel) and channel.recipient:
self._private_channels_by_user[channel.recipient.id] = channel
def add_dm_channel(self, data: DMChannelPayload) -> DMChannel:
# self.user is *always* cached when this is called
channel = DMChannel(me=self.user, state=self, data=data) # type: ignore
self._add_private_channel(channel)
return channel
def _remove_private_channel(self, channel: PrivateChannel) -> None:
self._private_channels.pop(channel.id, None)
if isinstance(channel, DMChannel):
recipient = channel.recipient
if recipient is not None:
self._private_channels_by_user.pop(recipient.id, None)
def _get_message(self, msg_id: Optional[int]) -> Optional[Message]:
return (
utils.find(lambda m: m.id == msg_id, reversed(self._messages))
if self._messages
else utils.find(lambda m: m.id == msg_id, reversed(self._call_message_cache.values()))
)
def _add_guild_from_data(self, data: GuildPayload) -> Guild:
guild = self.create_guild(data)
self._add_guild(guild)
return guild
def _guild_needs_chunking(self, guild: Guild) -> bool:
return self._chunk_guilds and not guild.chunked and not guild.unavailable
async def _can_chunk_guild(self, guild: Guild) -> bool:
if not guild.me:
await guild.query_members(user_ids=[self.self_id], cache=True) # type: ignore # self_id is always present here
return guild.me is not None and any(
(
guild.me.guild_permissions.kick_members,
guild.me.guild_permissions.ban_members,
guild.me.guild_permissions.manage_roles,
)
)
def _get_guild_channel(
self, data: PartialMessagePayload, guild_id: Optional[int] = None
) -> Tuple[Union[Channel, Thread], Optional[Guild]]:
channel_id = int(data['channel_id'])
try:
guild_id = guild_id or int(data['guild_id'])
guild = self._get_guild(guild_id)
except KeyError:
channel = self.get_channel(channel_id)
guild = None
else:
channel = guild and guild._resolve_channel(channel_id)
return channel or PartialMessageable(state=self, guild_id=guild_id, id=channel_id), guild
async def _delete_messages(self, channel_id, messages, reason: Optional[str] = None) -> None:
delete_message = self.http.delete_message
for msg in messages:
try:
await delete_message(channel_id, msg.id, reason=reason)
except NotFound:
pass
def _update_poll_counts(self, message: Message, answer_id: int, added: bool, self_voted: bool = False) -> Optional[Poll]:
poll = message.poll
if not poll:
return
poll._handle_vote(answer_id, added, self_voted)
return poll
def _update_poll_results(self, from_: Message, to: Union[Message, int]) -> None:
if isinstance(to, Message):
cached = self._get_message(to.id)
elif isinstance(to, int):
cached = self._get_message(to)
if cached is None:
return
to = cached
else:
return
if to.poll is None:
return
to.poll._update_results_from_message(from_)
if cached is not None and cached.poll:
cached.poll._update_results_from_message(from_)
def subscribe_guild(
self, guild: Guild, typing: bool = True, activities: bool = True, threads: bool = True, member_updates: bool = True
) -> Coroutine:
return self.subscriptions.subscribe_to(
guild, typing=typing, activities=activities, threads=threads, member_updates=member_updates
)
def chunker(
self,
guild_ids: List[Snowflake],
query: Optional[str] = '',
limit: int = 0,
presences: bool = True,
*,
user_ids: Optional[List[Snowflake]] = None,
nonce: Optional[str] = None,
):
return self.ws.request_chunks(
guild_ids, query=query, limit=limit, presences=presences, user_ids=user_ids, nonce=nonce
)
async def query_members(
self,
guild: Guild,
query: Optional[str],
limit: int,
user_ids: Optional[List[Snowflake]],
cache: bool,
presences: bool,
) -> List[Member]:
guild_id = guild.id
request = ChunkRequest(guild.id, self.loop, self._get_guild, cache=cache)
self._chunk_requests[request.nonce] = request
try:
await self.chunker(
[guild_id], query=query, limit=limit, presences=presences, user_ids=user_ids, nonce=request.nonce
)
return await asyncio.wait_for(request.wait(), timeout=30.0)
except asyncio.TimeoutError:
_log.warning('Timed out waiting for chunks with query %r and limit %d for guild ID %d.', query, limit, guild_id)
raise
async def search_recent_members(
self,
guild: Guild,
query: str = '',
limit: Optional[int] = None,
cache: bool = False,
) -> List[Member]:
guild_id = guild.id
request = ChunkRequest(guild.id, self.loop, self._get_guild, limit=limit, cache=cache, oneshot=False)
self._chunk_requests[request.nonce] = request
# Unlike query members, this OP is paginated
old_continuation_token = None
continuation_token = None
while True:
try:
await self.ws.search_recent_members(guild_id, query=query, nonce=request.nonce, after=continuation_token)
returned = await asyncio.wait_for(request.wait(), timeout=30.0)
except asyncio.TimeoutError:
_log.warning(
'Timed out waiting for search chunks with query %r and limit %d for guild ID %d.',
query,
limit,
guild_id,
)
raise
if (limit is not None and request.remaining < 1) or len(returned) < 1:
break
# Sort the members by joined_at timestamp and grab the oldest one
request.buffer.sort(key=lambda m: m.joined_at or utils.utcnow())
old_continuation_token = continuation_token
continuation_token = request.buffer[0].id
if continuation_token == old_continuation_token:
break
self._chunk_requests.pop(request.nonce, None)
return list(set(request.buffer))
async def _delay_ready(self) -> None:
manager = self.subscriptions
manager.blocked = True
try:
# Try to only send one OP each
member_nonce = utils._generate_nonce()
states = []
to_chunk = []
if not manager.empty:
await manager._requeue_subscriptions()
for guild in self._guilds.values():
if self._subscribe_guilds:
await self.subscribe_guild(guild)
if self._guild_needs_chunking(guild):
if await self._can_chunk_guild(guild):
future = await self.chunk_guild(guild, wait=False, nonce=member_nonce)
to_chunk.append(guild.id)
states.append((guild, future))
elif not guild._offline_members_hidden:
request = MemberSidebar(guild, MISSING, chunk=True, cache=True, loop=self.loop, delay=0)
if not request.channels:
# Not possible to scrape here
continue
self._scrape_requests[guild.id] = request
request.start()
states.append((guild, request.get_future()))
manager.blocked = False
await self.chunker(to_chunk, nonce=member_nonce)
for guild, future in states:
timeout = self._chunk_timeout(guild)
try:
await asyncio.wait_for(future, timeout=timeout)
except asyncio.TimeoutError:
_log.warning('Timed out waiting for chunks for guild ID %s.', guild.id)
except (ClientException, InvalidData):
pass
except asyncio.CancelledError:
pass
else:
# Dispatch the event
self.call_handlers('ready')
self.dispatch('ready')
finally:
# Make sure we don't block it forever
manager.blocked = False
self._ready_task = None
def parse_ready(self, data: gw.ReadyEvent) -> None:
if self._ready_task is not None:
self._ready_task.cancel()
self.clear()
self._ready_data = data
# Clear the ACK token
self.http.ack_token = None
# Self parsing
self.user = user = ClientUser(state=self, data=data['user'])
self._users[user.id] = user # type: ignore
# Read state parsing
read_states = data.get('read_state', {})
for read_state in read_states['entries']:
item = ReadState(state=self, data=read_state)
self.store_read_state(item)
self.read_state_version = read_states.get('version', 0)
# Guild settings parsing
guild_settings = data.get('user_guild_settings', {})
self.guild_settings = {
utils._get_as_snowflake(entry, 'guild_id'): GuildSettings(data=entry, state=self)
for entry in guild_settings.get('entries', [])
}
self.guild_settings_version = guild_settings.get('version', 0)
# Experiments
self.experiments = {exp[0]: UserExperiment(state=self, data=exp) for exp in data.get('experiments', [])}
self.guild_experiments = {exp[0]: GuildExperiment(state=self, data=exp) for exp in data.get('guild_experiments', [])}
# Extras
self.analytics_token = data.get('analytics_token')
self.preferred_rtc_regions = data.get('geo_ordered_rtc_regions', ['us-central'])
self.settings = UserSettings(self, data.get('user_settings_proto', ''))
self.consents = TrackingSettings(data=data.get('consents', {}), state=self)
self.country_code = data.get('country_code', 'US')
self.api_code_version = data.get('api_code_version', 1)
self.session_type = data.get('session_type', 'normal')
self.auth_session_id = data.get('auth_session_id_hash')
self.connections = {c['id']: Connection(state=self, data=c) for c in data.get('connected_accounts', [])}
self.pending_payments = {int(p['id']): Payment(state=self, data=p) for p in (data.get('pending_payments') or [])}
self.required_action = try_enum(RequiredActionType, data['required_action']) if 'required_action' in data else None
self.friend_suggestion_count = data.get('friend_suggestion_count', 0)
if 'sessions' in data:
self.parse_sessions_replace(data['sessions'], from_ready=True)
if 'auth_token' in data:
self.http._token(data['auth_token'])
if 'tutorial' in data and data['tutorial']:
self.tutorial = Tutorial(state=self, data=data['tutorial'])
# Before parsing the rest, we wait for READY_SUPPLEMENTAL
# This has voice state objects, as well as an initial member cache
def parse_ready_supplemental(self, extra_data: gw.ReadySupplementalEvent) -> None:
data = self._ready_data
# Temp user parsing
user = self.user
temp_users: Dict[int, PartialUserPayload] = {int(data['user']['id']): data['user']}
for u in data.get('users', []):
u_id = int(u['id'])
temp_users[u_id] = u
# Discord bad
for guild_data, guild_extra, merged_members, merged_me, merged_presences in zip(
data.get('guilds', []),
extra_data.get('guilds', []),
extra_data.get('merged_members', []),
data.get('merged_members', []),
extra_data['merged_presences'].get('guilds', []),
):
for presence in merged_presences:
presence['user'] = {'id': presence['user_id']} # type: ignore
if 'properties' in guild_data:
guild_data.update(guild_data.pop('properties')) # type: ignore
voice_states = guild_data.setdefault('voice_states', [])
voice_states.extend(guild_extra.get('voice_states', []))
members = guild_data.setdefault('members', [])
members.extend(merged_me)
members.extend(merged_members)
presences = guild_data.setdefault('presences', [])
presences.extend(merged_presences)
for member in members:
if 'user' not in member:
member['user'] = temp_users.get(int(member.pop('user_id')))
for voice_state in voice_states:
if 'member' not in voice_state:
member = utils.find(lambda m: m['user']['id'] == voice_state['user_id'], members)
if member:
voice_state['member'] = member
# Guild parsing
for guild_data in data.get('guilds', []):
self._add_guild_from_data(guild_data)
# Relationship parsing
for relationship in data.get('relationships', []):
try:
r_id = int(relationship['id'])
except KeyError:
continue
else:
if 'user' not in relationship:
relationship['user'] = temp_users[int(relationship.pop('user_id'))]
self._relationships[r_id] = Relationship(state=self, data=relationship)
# Relationship presence parsing
for presence in extra_data['merged_presences'].get('friends', []):
user_id = int(presence.pop('user_id')) # type: ignore
self.store_presence(user_id, self.create_presence(presence))
# Private channel parsing
for pm in data.get('private_channels', []) + extra_data.get('lazy_private_channels', []):
factory, _ = _private_channel_factory(pm['type'])
if 'recipients' not in pm:
pm['recipients'] = [temp_users[int(u_id)] for u_id in pm.pop('recipient_ids')] # type: ignore
self._add_private_channel(factory(me=user, data=pm, state=self)) # type: ignore
# Disloses
self.disclose = data.get('disclose', [])
# We're done
del self._ready_data
self.call_handlers('connect')
self.dispatch('connect')
self._ready_task = asyncio.create_task(self._delay_ready())
def parse_resumed(self, data: gw.ResumedEvent) -> None:
self.dispatch('resumed')
def parse_passive_update_v2(self, data: gw.PassiveUpdateV2Event) -> None:
# PASSIVE_UPDATE_V2 is sent for large guilds you are not subscribed to
# in order to keep their members/read and voice states up-to-date; it replaces CHANNEL_UNREADS_UPDATE and PASSIVE_UPDATE_V1
guild = self._get_guild(int(data['guild_id']))
if not guild:
_log.debug('PASSIVE_UPDATE_V2 referencing an unknown guild ID: %s. Discarding.', data['guild_id'])
return
for user_id in map(int, data.get('removed_voice_states', [])):
guild._voice_states.pop(user_id, None)
for channel_data in data.get('updated_channels', []):
channel = guild.get_channel(int(channel_data['id']))
if not channel:
continue
channel.last_message_id = utils._get_as_snowflake(channel_data, 'last_message_id') # type: ignore
if 'last_pin_timestamp' in channel_data and hasattr(channel, 'last_pin_timestamp'):
channel.last_pin_timestamp = utils.parse_time(channel_data['last_pin_timestamp']) # type: ignore
members = {int(m['user']['id']): m for m in data.get('members', [])}
cache_flags = self.member_cache_flags
for k, member_data in members.items():
member = guild.get_member(k)
if member is not None:
member._update(member_data)
else:
if cache_flags.voice:
member = Member(data=member_data, guild=guild, state=self)
guild._add_member(member)
for voice_state in data.get('updated_voice_states', []):
user_id = int(voice_state['user_id'])
member_data = members.get(user_id)
if member_data:
voice_state['member'] = member_data
guild._update_voice_state(voice_state, utils._get_as_snowflake(voice_state, 'channel_id'))
def parse_message_create(self, data: gw.MessageCreateEvent) -> None:
channel, _ = self._get_guild_channel(data)
# channel will be the correct type here
message = Message(channel=channel, data=data, state=self) # type: ignore
self.dispatch('message', message)
if self._messages is not None:
self._messages.append(message)
if message.call is not None:
self._call_message_cache[message.id] = message
if channel:
channel.last_message_id = message.id # type: ignore
read_state = self.get_read_state(channel.id)
if message.author.id == self.self_id:
# Implicitly mark our own messages as read
read_state.last_acked_id = message.id
if (
not message.author.is_blocked()
and not (channel.type == ChannelType.group and message.type == MessageType.recipient_remove)
and message._is_self_mentioned()
):
# Increment mention count if applicable
read_state.badge_count += 1
def parse_message_delete(self, data: gw.MessageDeleteEvent) -> None:
raw = RawMessageDeleteEvent(data)
found = self._get_message(raw.message_id)
raw.cached_message = found
self.dispatch('raw_message_delete', raw)
if self._messages is not None and found is not None:
self.dispatch('message_delete', found)
self._messages.remove(found)
def parse_message_delete_bulk(self, data: gw.MessageDeleteBulkEvent) -> None:
raw = RawBulkMessageDeleteEvent(data)
if self._messages:
found_messages = [message for message in self._messages if message.id in raw.message_ids]
else:
found_messages = []
raw.cached_messages = found_messages
self.dispatch('raw_bulk_message_delete', raw)
if found_messages:
self.dispatch('bulk_message_delete', found_messages)
for msg in found_messages:
# self._messages won't be None here
self._messages.remove(msg) # type: ignore
def parse_message_update(self, data: gw.MessageUpdateEvent) -> None:
channel, _ = self._get_guild_channel(data)
# channel would be the correct type here
updated_message = Message(channel=channel, data=data, state=self) # type: ignore
raw = RawMessageUpdateEvent(data=data, message=updated_message)
cached_message = self._get_message(updated_message.id)
if cached_message is not None:
older_message = copy.copy(cached_message)
raw.cached_message = older_message
self.dispatch('raw_message_edit', raw)
cached_message._update(data)
# Coerce the `after` parameter to take the new updated Member
# ref: #5999
older_message.author = updated_message.author
self.dispatch('message_edit', older_message, updated_message)
else:
self.dispatch('raw_message_edit', raw)
def parse_message_ack(self, data: gw.MessageAckEvent) -> None:
self.read_state_version = data.get('version', self.read_state_version)
channel_id = int(data['channel_id'])
channel = self.get_channel(channel_id)
if channel is None:
_log.debug('MESSAGE_ACK referencing an unknown channel ID: %s. Discarding.', channel_id)
return
raw = RawMessageAckEvent(data)
message_id = int(data['message_id'])
message = self._get_message(message_id)
raw.cached_message = message
read_state = self.get_read_state(channel_id)
read_state.last_acked_id = message_id
if 'mention_count' in data:
read_state.badge_count = data['mention_count']
if 'flags' in data and data['flags'] is not None:
read_state._flags = data['flags']
if 'last_viewed' in data and data['last_viewed']:
read_state.last_viewed = read_state.unpack_last_viewed(data['last_viewed'])
self.dispatch('raw_message_ack', raw)
if message is not None:
self.dispatch('message_ack', message, raw.manual)
def parse_message_reaction_add(self, data: gw.MessageReactionAddEvent) -> None:
emoji = data['emoji']
emoji_id = utils._get_as_snowflake(emoji, 'id')
emoji = PartialEmoji.with_state(self, id=emoji_id, animated=emoji.get('animated', False), name=emoji['name']) # type: ignore
raw = RawReactionActionEvent(data, emoji, 'REACTION_ADD')
member_data = data.get('member')
if member_data:
guild = self._get_guild(raw.guild_id)
if guild is not None:
raw.member = Member(data=member_data, guild=guild, state=self)
else:
raw.member = None
else:
raw.member = None
self.dispatch('raw_reaction_add', raw)
# rich interface here
message = self._get_message(raw.message_id)
if message is not None:
emoji = self._upgrade_partial_emoji(emoji)
reaction = message._add_reaction(data, emoji, raw.user_id)
user = raw.member or self._get_reaction_user(message.channel, raw.user_id)
if user:
self.dispatch('reaction_add', reaction, user)
def parse_message_reaction_remove_all(self, data: gw.MessageReactionRemoveAllEvent) -> None:
raw = RawReactionClearEvent(data)
self.dispatch('raw_reaction_clear', raw)
message = self._get_message(raw.message_id)
if message is not None:
old_reactions = message.reactions.copy()
message.reactions.clear()
self.dispatch('reaction_clear', message, old_reactions)
def parse_message_reaction_remove(self, data: gw.MessageReactionRemoveEvent) -> None:
emoji = PartialEmoji.from_dict(data['emoji'])
emoji._state = self
raw = RawReactionActionEvent(data, emoji, 'REACTION_REMOVE')
self.dispatch('raw_reaction_remove', raw)
message = self._get_message(raw.message_id)
if message is not None:
emoji = self._upgrade_partial_emoji(emoji)
try:
reaction = message._remove_reaction(data, emoji, raw.user_id)
except (AttributeError, ValueError): # eventual consistency lol
pass
else:
user = self._get_reaction_user(message.channel, raw.user_id)
if user:
self.dispatch('reaction_remove', reaction, user)
def parse_message_reaction_remove_emoji(self, data: gw.MessageReactionRemoveEmojiEvent) -> None:
emoji = PartialEmoji.from_dict(data['emoji'])
emoji._state = self
raw = RawReactionClearEmojiEvent(data, emoji)
self.dispatch('raw_reaction_clear_emoji', raw)
message = self._get_message(raw.message_id)
if message is not None:
try:
reaction = message._clear_emoji(emoji)
except (AttributeError, ValueError): # eventual consistency lol
pass
else:
if reaction:
self.dispatch('reaction_clear_emoji', reaction)
def parse_recent_mention_delete(self, data: gw.RecentMentionDeleteEvent) -> None:
message_id = int(data['message_id'])
message = self._get_message(message_id)
if message is not None:
self.dispatch('recent_mention_delete', message)
self.dispatch('raw_recent_mention_delete', message_id)
def parse_presences_replace(self, data: List[gw.PartialPresenceUpdate]) -> None:
for presence in data:
self.parse_presence_update(presence)
def _handle_presence_update(self, guild: Optional[Guild], data: gw.BasePresenceUpdate):
guild_id = guild.id if guild else None
user = data['user']
user_id = int(user['id'])
presence = self.get_presence(user_id, guild_id)
if presence is not None:
old_presence = Presence._copy(presence)
presence._update(data, self)
else:
old_presence = Presence._offline()
presence = self.store_presence(user_id, self.create_presence(data), guild_id)
if not guild:
try:
relationship = self.create_implicit_relationship(self.store_user(user))
except (KeyError, ValueError):
# User object is partial, so we can't continue
_log.debug('PRESENCE_UPDATE referencing an unknown relationship ID: %s. Discarding.', user_id)
return
user_update = relationship.user._update_self(user)
if old_presence != presence:
old_relationship = Relationship._copy(relationship, old_presence)
self.dispatch('presence_update', old_relationship, relationship)
else:
member = guild.get_member(user_id)
if member is None:
_log.debug('PRESENCE_UPDATE referencing an unknown member ID: %s. Discarding.', user_id)
return
user_update = member._user._update_self(user)
if old_presence != presence:
old_member = Member._copy(member)
old_member._presence = old_presence
self.dispatch('presence_update', old_member, member)
if user_update:
self.dispatch('user_update', user_update[0], user_update[1])
def parse_presence_update(self, data: gw.PresenceUpdateEvent) -> None:
guild_id = utils._get_as_snowflake(data, 'guild_id')
guild = self._get_guild(guild_id)
if guild_id and not guild:
_log.debug('PRESENCE_UPDATE referencing an unknown guild ID: %s. Discarding.', guild_id)
return
self._handle_presence_update(guild, data)
def parse_user_update(self, data: gw.UserUpdateEvent) -> None:
# Clear the ACK token
self.http.ack_token = None
if self.user:
self.user._full_update(data)
def parse_user_note_update(self, data: gw.UserNoteUpdateEvent) -> None:
# The gateway does not provide note objects on READY with our default capabilities
# so we cannot have (old, new) event dispatches
user_id = int(data['id'])
text = data['note']
user = self.get_user(user_id)
if user:
note = user.note
note._value = text
else:
note = Note(self, user_id, note=text)
self.dispatch('note_update', note)
# def parse_user_settings_update(self, data) -> None:
# new_settings = self.settings
# old_settings = copy.copy(new_settings)
# new_settings._update(data)
# self.dispatch('settings_update', old_settings, new_settings)
# self.dispatch('internal_settings_update', old_settings, new_settings)
def parse_user_settings_proto_update(self, data: gw.ProtoSettingsEvent):
type = UserSettingsType(data['settings']['type'])
if type == UserSettingsType.preloaded_user_settings:
settings = self.settings
if settings:
old_settings = UserSettings._copy(settings)
settings._update(data['settings']['proto'], partial=data.get('partial', False))
self.dispatch('settings_update', old_settings, settings)
self.dispatch('internal_settings_update', old_settings, settings)
elif type == UserSettingsType.frecency_user_settings:
...
elif type == UserSettingsType.test_settings:
_log.debug('Received test settings proto update. Data: %s', data['settings']['proto'])
else:
_log.warning('Unknown user settings proto type: %s', type.value)
def parse_user_guild_settings_update(self, data: gw.UserGuildSettingsEvent) -> None:
guild_id = utils._get_as_snowflake(data, 'guild_id')
settings = self.guild_settings.get(guild_id)
if settings is not None:
old_settings = copy.copy(settings)
settings._update(data)
else:
old_settings = None
settings = GuildSettings(data=data, state=self)
self.guild_settings_version = data.get('version', self.guild_settings_version)
self.dispatch('guild_settings_update', old_settings, settings)
def parse_user_required_action_update(self, data: gw.RequiredActionEvent) -> None:
required_action = try_enum(RequiredActionType, data['required_action']) if data['required_action'] else None
self.required_action = required_action
self.dispatch('required_action_update', required_action)
def parse_user_non_channel_ack(self, data: gw.NonChannelAckEvent) -> None:
self.read_state_version = data.get('version', self.read_state_version)
raw = RawUserFeatureAckEvent(data)
read_state = self.get_read_state(self.self_id, raw.type) # type: ignore
read_state.last_acked_id = int(data['entity_id'])
self.dispatch('user_feature_ack', raw)
def parse_user_connections_update(self, data: Union[gw.ConnectionEvent, gw.PartialConnectionEvent]) -> None:
self.dispatch('connections_update')
id = data.get('id')
if id is None or 'user_id' in data:
return
if id not in self.connections:
self.connections[id] = connection = Connection(state=self, data=data)
self.dispatch('connection_create', connection)
else:
# TODO: We can also get to this point if the connection has been deleted
# We can detect that by checking if the payload is identical to the previous payload
# However, certain events can also trigger updates with identical payloads, so we can't rely on that
# For now, we assume everything is an update; thanks Discord
connection = self.connections[id]
old_connection = copy.copy(connection)
connection._update(data)
self.dispatch('connection_update', old_connection, connection)
def parse_user_connections_link_callback(self, data: gw.ConnectionsLinkCallbackEvent) -> None:
self.dispatch('connections_link_callback', data['provider'], data['callback_code'], data['callback_state'])
def parse_user_payment_sources_update(self, data: gw.NoEvent) -> None:
self.dispatch('payment_sources_update')
def parse_user_subscriptions_update(self, data: gw.NoEvent) -> None:
self.dispatch('subscriptions_update')
def parse_user_payment_client_add(self, data: gw.PaymentClientAddEvent) -> None:
self.dispatch('payment_client_add', data['purchase_token_hash'], utils.parse_time(data['expires_at']))
def parse_user_premium_guild_subscription_slot_create(self, data: gw.PremiumGuildSubscriptionSlotEvent) -> None:
slot = PremiumGuildSubscriptionSlot(state=self, data=data)
self.dispatch('premium_guild_subscription_slot_create', slot)
def parse_user_premium_guild_subscription_slot_update(self, data: gw.PremiumGuildSubscriptionSlotEvent) -> None:
slot = PremiumGuildSubscriptionSlot(state=self, data=data)
self.dispatch('premium_guild_subscription_slot_update', slot)
def parse_user_achievement_update(self, data: gw.AchievementUpdatePayload) -> None:
achievement: AchievementPayload = data.get('achievement') # type: ignore
application_id = data.get('application_id')
if not achievement or not application_id:
_log.warning('USER_ACHIEVEMENT_UPDATE payload has invalid data: %s. Discarding.', list(data.keys()))
return
achievement['application_id'] = application_id
model = Achievement(state=self, data=achievement)
self.dispatch('achievement_update', model, data.get('percent_complete', 0))
def parse_billing_popup_bridge_callback(self, data: gw.BillingPopupBridgeCallbackEvent) -> None:
self.dispatch(
'billing_popup_bridge_callback',
try_enum(PaymentSourceType, data.get('payment_source_type', 0)),
data.get('path', ''),
Metadata(data.get('query', {})),
data.get('state'),
)
def parse_oauth2_token_revoke(self, data: gw.OAuth2TokenRevokeEvent) -> None:
if 'access_token' not in data or 'application_id' not in data:
_log.warning('OAUTH2_TOKEN_REVOKE payload has invalid data: %s. Discarding.', list(data.keys()))
self.dispatch('oauth2_token_revoke', data['access_token'], data['application_id'])
def parse_auth_session_change(self, data: gw.AuthSessionChangeEvent) -> None:
self.auth_session_id = auth_session_id = data['auth_session_id_hash']
self.dispatch('auth_session_change', auth_session_id)
def parse_payment_update(self, data: gw.PaymentUpdateEvent) -> None:
id = int(data['id'])
payment = self.pending_payments.get(id)
if payment is not None:
payment._update(data)
else:
payment = Payment(state=self, data=data)
self.dispatch('payment_update', payment)
def parse_library_application_update(self, data: gw.LibraryApplicationUpdateEvent) -> None:
entry = LibraryApplication(state=self, data=data)
self.dispatch('library_application_update', entry)
def parse_sessions_replace(self, payload: gw.SessionsReplaceEvent, *, from_ready: bool = False) -> None:
data = {s['session_id']: s for s in payload}
for session_id, session in data.items():
existing = self._sessions.get(session_id)
if existing is not None:
old = copy.copy(existing)
existing._update(session)
if not from_ready and (
old.status != existing.status or old.active != existing.active or old.activities != existing.activities
):
self.dispatch('session_update', old, existing)
else:
existing = Session(state=self, data=session)
self._sessions[session_id] = existing
if not from_ready:
self.dispatch('session_create', existing)
old_all = None
if not from_ready:
removed_sessions = [s for s in self._sessions if s not in data]
for session_id in removed_sessions:
if session_id == 'all':
old_all = self._sessions.pop('all')
else:
session = self._sessions.pop(session_id)
self.dispatch('session_delete', session)
if 'all' not in self._sessions:
# The "all" session does not always exist...
# This usually happens if there is only a single session (us)
# In the case it is "removed", we try to update the old one
# Else, we create a new one with fake data
if len(data) > 1:
# We have more than one session, this should not happen
fake = data[self.session_id] # type: ignore
else:
fake = list(data.values())[0]
if old_all is not None:
old = copy.copy(old_all)
old_all._update(fake)
if old.status != old_all.status or old.active != old_all.active or old.activities != old_all.activities:
self.dispatch('session_update', old, old_all)
else:
old_all = Session._fake_all(state=self, data=fake)
self._sessions['all'] = old_all
def parse_entitlement_create(self, data: gw.EntitlementEvent) -> None:
entitlement = Entitlement(state=self, data=data)
self.dispatch('entitlement_create', entitlement)
def parse_entitlement_update(self, data: gw.EntitlementEvent) -> None:
entitlement = Entitlement(state=self, data=data)
self.dispatch('entitlement_update', entitlement)
def parse_entitlement_delete(self, data: gw.EntitlementEvent) -> None:
entitlement = Entitlement(state=self, data=data)
self.dispatch('entitlement_delete', entitlement)
def parse_gift_code_create(self, data: gw.GiftCreateEvent) -> None:
# Should be fine:tm:
gift = Gift(state=self, data=data) # type: ignore
self.dispatch('gift_create', gift)
def parse_gift_code_update(self, data: gw.GiftUpdateEvent) -> None:
# Should be fine:tm:
gift = Gift(state=self, data=data) # type: ignore
self.dispatch('gift_update', gift)
def parse_invite_create(self, data: gw.InviteCreateEvent) -> None:
invite = Invite.from_gateway(state=self, data=data)
self.dispatch('invite_create', invite)
def parse_invite_delete(self, data: gw.InviteDeleteEvent) -> None:
invite = Invite.from_gateway(state=self, data=data)
self.dispatch('invite_delete', invite)
def parse_channel_delete(self, data: gw.ChannelDeleteEvent) -> None:
guild = self._get_guild(utils._get_as_snowflake(data, 'guild_id'))
channel_id = int(data['id'])
if guild is not None:
channel = guild.get_channel(channel_id)
if channel is not None:
guild._remove_channel(channel)
self.dispatch('guild_channel_delete', channel)
if channel.type in (ChannelType.voice, ChannelType.stage_voice):
for s in guild.scheduled_events:
if s.channel_id == channel.id:
guild._scheduled_events.pop(s.id)
self.dispatch('scheduled_event_delete', s)
threads = guild._remove_threads_by_channel(channel_id)
for thread in threads:
self.dispatch('thread_delete', thread)
self.dispatch('raw_thread_delete', RawThreadDeleteEvent._from_thread(thread))
else:
channel = self._get_private_channel(channel_id)
if channel is not None:
self._remove_private_channel(channel)
self.dispatch('private_channel_delete', channel)
# Nuke read state
read_state = self.get_read_state(channel_id)
if read_state is not None:
self.remove_read_state(read_state)
def parse_channel_update(self, data: gw.ChannelUpdateEvent) -> None:
channel_type = try_enum(ChannelType, data.get('type'))
channel_id = int(data['id'])
if channel_type in (ChannelType.private, ChannelType.group):
channel = self._get_private_channel(channel_id)
if channel is not None:
old_channel = copy.copy(channel)
channel._update(data) # type: ignore # the data payload varies based on the channel type
self.dispatch('private_channel_update', old_channel, channel)
return
else:
_log.debug('CHANNEL_UPDATE referencing an unknown channel ID: %s. Discarding.', channel_id)
guild_id = utils._get_as_snowflake(data, 'guild_id')
guild = self._get_guild(guild_id)
if guild is not None:
channel = guild.get_channel(channel_id)
if channel is not None:
old_channel = copy.copy(channel)
channel._update(guild, data) # type: ignore # the data payload varies based on the channel type
self.dispatch('guild_channel_update', old_channel, channel)
else:
_log.debug('CHANNEL_UPDATE referencing an unknown channel ID: %s. Discarding.', channel_id)
else:
_log.debug('CHANNEL_UPDATE referencing an unknown guild ID: %s. Discarding.', guild_id)
def parse_channel_create(self, data: gw.ChannelCreateEvent) -> None:
factory, ch_type = _channel_factory(data['type'])
if factory is None:
_log.debug('CHANNEL_CREATE referencing an unknown channel type %s. Discarding.', data['type'])
return
if ch_type in (ChannelType.group, ChannelType.private):
channel_id = int(data['id'])
if self._get_private_channel(channel_id) is None:
channel = factory(me=self.user, data=data, state=self) # type: ignore # user is always present when logged in
self._add_private_channel(channel) # type: ignore # channel will always be a private channel
self.dispatch('private_channel_create', channel)
else:
guild_id = utils._get_as_snowflake(data, 'guild_id')
guild = self._get_guild(guild_id)
if guild is not None:
# The factory can't be a DMChannel or GroupChannel here
channel = factory(guild=guild, state=self, data=data) # type: ignore
guild._add_channel(channel) # type: ignore
self.dispatch('guild_channel_create', channel)
else:
_log.debug('CHANNEL_CREATE referencing an unknown guild ID: %s. Discarding.', guild_id)
return
def parse_channel_pins_update(self, data: gw.ChannelPinsUpdateEvent) -> None:
channel_id = int(data['channel_id'])
try:
guild = self._get_guild(int(data['guild_id']))
except KeyError:
guild = None
channel = self._get_private_channel(channel_id)
else:
channel = guild and guild._resolve_channel(channel_id)
if channel is None:
_log.debug('CHANNEL_PINS_UPDATE referencing an unknown channel ID: %s. Discarding.', channel_id)
return
last_pin = utils.parse_time(data.get('last_pin_timestamp'))
if hasattr(channel, 'last_pin_timestamp'):
channel.last_pin_timestamp = last_pin # type: ignore # Handled above
if guild is None:
self.dispatch('private_channel_pins_update', channel, last_pin)
else:
self.dispatch('guild_channel_pins_update', channel, last_pin)
def parse_channel_pins_ack(self, data: gw.ChannelPinsAckEvent) -> None:
self.read_state_version = data.get('version', self.read_state_version)
channel_id = int(data['channel_id'])
channel = self.get_channel(channel_id)
if channel is None:
_log.debug('CHANNEL_PINS_ACK referencing an unknown channel ID: %s. Discarding.', channel_id)
return
read_state = self.get_read_state(channel_id)
last_pin = utils.parse_time(data.get('last_pin'))
read_state.acked_pin_timestamp = last_pin
if channel.guild is None:
self.dispatch('private_channel_pins_ack', channel, last_pin)
else:
self.dispatch('guild_channel_pins_ack', channel, last_pin)
def parse_channel_recipient_add(self, data: gw.ChannelRecipientEvent) -> None:
channel = self._get_private_channel(int(data['channel_id']))
if channel is None:
_log.debug('CHANNEL_RECIPIENT_ADD referencing an unknown channel ID: %s. Discarding.', data['channel_id'])
return
user = self.store_user(data['user'])
channel.recipients.append(user) # type: ignore
if 'nick' in data:
channel.nicks[user] = data['nick'] # type: ignore
self.dispatch('group_join', channel, user)
def parse_channel_recipient_remove(self, data: gw.ChannelRecipientEvent) -> None:
channel = self._get_private_channel(int(data['channel_id']))
if channel is None:
_log.debug('CHANNEL_RECIPIENT_REMOVE referencing an unknown channel ID: %s. Discarding.', data['channel_id'])
return
user = self.store_user(data['user'])
try:
channel.recipients.remove(user) # type: ignore
except ValueError:
pass
else:
self.dispatch('group_remove', channel, user)
def parse_thread_create(self, data: gw.ThreadCreateEvent) -> None:
guild_id = int(data['guild_id'])
guild: Optional[Guild] = self._get_guild(guild_id)
if guild is None:
_log.debug('THREAD_CREATE referencing an unknown guild ID: %s. Discarding.', guild_id)
return
existing = guild.get_thread(int(data['id']))
if existing is not None: # Shouldn't happen
old = existing._update(data)
if old is not None:
self.dispatch('thread_update', old, existing)
else:
thread = Thread(guild=guild, state=self, data=data)
guild._add_thread(thread)
if data.get('newly_created', False):
self.dispatch('thread_create', thread)
else:
self.dispatch('thread_join', thread)
def parse_thread_update(self, data: gw.ThreadUpdateEvent) -> None:
guild_id = int(data['guild_id'])
guild = self._get_guild(guild_id)
if guild is None:
_log.debug('THREAD_UPDATE referencing an unknown guild ID: %s. Discarding', guild_id)
return
existing = guild.get_thread(int(data['id']))
if existing is not None:
old = existing._update(data)
if existing.archived:
guild._remove_thread(existing)
if old is not None:
self.dispatch('thread_update', old, existing)
else: # Shouldn't happen
thread = Thread(guild=guild, state=self, data=data)
guild._add_thread(thread)
def parse_thread_delete(self, data: gw.ThreadDeleteEvent) -> None:
guild_id = int(data['guild_id'])
guild = self._get_guild(guild_id)
if guild is None:
_log.debug('THREAD_DELETE referencing an unknown guild ID: %s. Discarding', guild_id)
return
raw = RawThreadDeleteEvent(data)
raw.thread = thread = guild.get_thread(raw.thread_id)
self.dispatch('raw_thread_delete', raw)
if thread is not None:
guild._remove_thread(thread)
self.dispatch('thread_delete', thread)
# Nuke read state
read_state = self.get_read_state(raw.thread_id)
if read_state is not None:
self.remove_read_state(read_state)
def parse_thread_list_sync(self, data: gw.ThreadListSyncEvent) -> None:
guild_id = int(data['guild_id'])
guild: Optional[Guild] = self._get_guild(guild_id)
if guild is None:
_log.debug('THREAD_LIST_SYNC referencing an unknown guild ID: %s. Discarding.', guild_id)
return
try:
channel_ids = {int(i) for i in data['channel_ids']}
except KeyError:
channel_ids = None
threads = guild._threads.copy()
else:
threads = guild._filter_threads(channel_ids)
new_threads = {}
for d in data.get('threads', []):
thread = threads.pop(int(d['id']), None)
if thread is not None:
old = thread._update(d)
if old is not None:
self.dispatch('thread_update', old, thread) # Honestly not sure if this is right
else:
thread = Thread(guild=guild, state=self, data=d)
new_threads[thread.id] = thread
old_threads = [t for t in threads.values() if t.id not in new_threads]
for member in data.get('members', []):
try: # Note: member['id'] is the thread_id
thread = threads[int(member['id'])]
except KeyError:
continue
else:
thread._add_member(ThreadMember(thread, member))
for k in new_threads.values():
guild._add_thread(k)
for k in old_threads:
del guild._threads[k.id]
self.dispatch('thread_delete', k) # Again, not sure
for message in data.get('most_recent_messages', []):
guild_id = utils._get_as_snowflake(message, 'guild_id')
channel, _ = self._get_guild_channel(message)
# channel will be the correct type here
message = Message(channel=channel, data=message, state=self) # type: ignore
if self._messages is not None:
self._messages.append(message)
def parse_thread_member_update(self, data: gw.ThreadMemberUpdate) -> None:
guild_id = int(data['guild_id'])
guild: Optional[Guild] = self._get_guild(guild_id)
if guild is None:
_log.debug('THREAD_MEMBER_UPDATE referencing an unknown guild ID: %s. Discarding.', guild_id)
return
thread_id = int(data['id'])
thread: Optional[Thread] = guild.get_thread(thread_id)
if thread is None:
_log.debug('THREAD_MEMBER_UPDATE referencing an unknown thread ID: %s. Discarding.', thread_id)
return
member = ThreadMember(thread, data)
thread.me = member
def parse_thread_members_update(self, data: gw.ThreadMembersUpdate) -> None:
guild_id = int(data['guild_id'])
guild: Optional[Guild] = self._get_guild(guild_id)
if guild is None:
_log.debug('THREAD_MEMBERS_UPDATE referencing an unknown guild ID: %s. Discarding.', guild_id)
return
thread_id = int(data['id'])
thread: Optional[Thread] = guild.get_thread(thread_id)
raw = RawThreadMembersUpdate(data)
if thread is None:
_log.debug('THREAD_MEMBERS_UPDATE referencing an unknown thread ID: %s. Discarding.', thread_id)
return
added_members = [ThreadMember(thread, d) for d in data.get('added_members', [])]
removed_member_ids = [int(x) for x in data.get('removed_member_ids', [])]
self_id = self.self_id
for member in added_members:
if member.id != self_id:
thread._add_member(member)
self.dispatch('thread_member_join', member)
else:
thread.me = member
self.dispatch('thread_join', thread)
for member_id in removed_member_ids:
member = thread._pop_member(member_id)
if member_id != self_id:
self.dispatch('raw_thread_member_remove', raw)
if member is not None:
self.dispatch('thread_member_remove', member)
else:
self.dispatch('raw_thread_member_remove', thread, member_id)
else:
self.dispatch('thread_remove', thread)
def parse_thread_member_list_update(self, data: gw.ThreadMemberListUpdateEvent) -> None:
guild_id = int(data['guild_id'])
guild: Optional[Guild] = self._get_guild(guild_id)
if guild is None:
_log.debug('THREAD_MEMBER_LIST_UPDATE referencing an unknown guild ID: %s. Discarding.', guild_id)
return
thread_id = int(data['thread_id'])
thread: Optional[Thread] = guild.get_thread(thread_id)
if thread is None:
_log.debug('THREAD_MEMBER_LIST_UPDATE referencing an unknown thread ID: %s. Discarding.', thread_id)
return
members = [ThreadMember(thread, member) for member in data['members']]
for m in members:
thread._add_member(m)
def parse_guild_member_add(self, data: gw.GuildMemberAddEvent) -> None:
guild = self._get_guild(int(data['guild_id']))
if guild is None:
_log.debug('GUILD_MEMBER_ADD referencing an unknown guild ID: %s. Discarding.', data['guild_id'])
return
member = Member(guild=guild, data=data, state=self)
presence = None
if 'presence' in data:
presence = self.create_presence(data['presence'])
if self.member_cache_flags.joined or member.id == self.self_id:
if presence is not None:
self.store_presence(member.id, presence, guild.id)
guild._add_member(member)
else:
member._presence = presence # Save the presence
if guild._member_count is not None:
guild._member_count += 1
self.dispatch('member_join', member)
def parse_guild_member_remove(self, data: gw.GuildMemberRemoveEvent) -> None:
guild = self._get_guild(int(data['guild_id']))
if guild is None:
_log.debug('GUILD_MEMBER_REMOVE referencing an unknown guild ID: %s. Discarding.', data['guild_id'])
return
user_data = data['user']
if len(user_data) <= 1:
# Fake event here, so no event
member = guild.get_member(int(user_data['id']))
if member is not None:
guild._remove_member(member)
else:
_log.debug('GUILD_MEMBER_REMOVE referencing an unknown member ID: %s. Discarding.', user_data['id'])
return
try:
user = self.store_user(user_data) # type: ignore
raw = RawMemberRemoveEvent(data, user)
except KeyError:
_log.debug('GUILD_MEMBER_REMOVE referencing an unknown user ID: %s. Discarding.', data['user']['id'])
return
guild = self._get_guild(raw.guild_id)
if guild is not None:
if guild._member_count is not None:
guild._member_count -= 1
member = guild.get_member(user.id)
if member is not None:
raw.user = member
guild._remove_member(member)
self.dispatch('member_remove', member)
else:
_log.debug('GUILD_MEMBER_REMOVE referencing an unknown guild ID: %s. Discarding.', data['guild_id'])
self.dispatch('raw_member_remove', raw)
def _handle_member_update(self, guild: Guild, data: Union[gw.GuildMemberUpdateEvent, gw.MemberWithUser]):
user = data['user']
user_id = int(user['id'])
member = guild.get_member(user_id)
if member is not None:
old_member = member._update(data)
if old_member is not None:
self.dispatch('member_update', old_member, member)
else:
if self.member_cache_flags.joined:
member = Member(data=data, guild=guild, state=self) # type: ignore # The data is close enough
guild._add_member(member)
_log.debug('GUILD_MEMBER_UPDATE referencing an unknown member ID: %s.', user_id)
if member is not None:
# Force an update on the inner user if necessary
user_update = member._user._update_self(user)
if user_update:
self.dispatch('user_update', user_update[0], user_update[1])
def parse_guild_member_update(self, data: gw.GuildMemberUpdateEvent) -> None:
guild = self._get_guild(int(data['guild_id']))
if guild is None:
_log.debug('GUILD_MEMBER_UPDATE referencing an unknown guild ID: %s. Discarding.', data['guild_id'])
return
self._handle_member_update(guild, data)
def parse_guild_member_list_update(self, data: gw.GuildMemberListUpdateEvent) -> None:
# The below code used to hackily emit guild member events from the member list
# This is no longer necessary, but is kept here commented out for reference
self.dispatch('raw_member_list_update', data)
guild = self._get_guild(int(data['guild_id']))
if guild is None:
_log.debug('GUILD_MEMBER_LIST_UPDATE referencing an unknown guild ID: %s. Discarding.', data['guild_id'])
return
request = self._scrape_requests.get(guild.id)
# should_parse = guild.chunked or getattr(request, 'chunk', False)
if data['member_count'] > 0:
guild._member_count = data['member_count']
if data['online_count'] > 0:
guild._presence_count = data['online_count']
guild._true_online_count = sum(group['count'] for group in data['groups'] if group['id'] != 'offline')
empty_tuple = tuple()
# to_add = []
# to_remove = []
# disregard = []
members = []
# if should_parse: # The SYNCs need to be first and in order for indexes to not crap a brick
# syncs = [opdata for opdata in data['ops'] if opdata['op'] == 'SYNC']
# syncs.sort(key=lambda op: op['range'][0])
# ops = syncs + [opdata for opdata in data['ops'] if opdata['op'] != 'SYNC']
# else:
ops = data['ops']
for opdata in ops:
# The OPs are as follows:
# SYNC: Provides member/presence data for a 100 member range of the member list
# UPDATE: Dispatched when a member is updated and stays in the same range
# INSERT: Dispatched when a member is inserted into a range
# DELETE: Dispatched when a member is removed from a range
# INVALIDATE: Sent when you're unsubscribed from a range
if opdata['op'] == 'SYNC':
for item in opdata['items']:
if 'group' in item: # Hoisted role
# (
# guild._member_list.append(None) if should_parse else None
# ) # Insert blank so indexes don't fuck up
continue
mdata = item['member']
member = Member(data=mdata, guild=guild, state=self)
if mdata.get('presence') is not None:
member._presence_update(mdata['presence'], empty_tuple)
members.append(member)
# guild._member_list.append(member) if should_parse else None
# elif opdata['op'] == 'INSERT':
# index = opdata['index']
# item = opdata['item']
# if 'group' in item: # Hoisted role
# (
# guild._member_list.insert(index, None) if should_parse else None
# ) # Insert blank so indexes don't fuck up
# continue
# mdata = item['member']
# user = mdata['user']
# user_id = int(user['id'])
# member = guild.get_member(user_id)
# if member is not None: # INSERTs are also sent when a user changes range
# old_member = Member._copy(member)
# dispatch = bool(member._update(mdata))
# if mdata.get('presence') is not None:
# pdata = mdata['presence']
# presence = self.get_presence(user_id, guild.id)
# if presence is not None:
# old_presence = Presence._copy(presence)
# presence._update(pdata, self)
# else:
# old_presence = Presence._offline()
# presence = self.store_presence(user_id, self.create_presence(pdata), guild.id)
# old_member._presence = old_presence
# if should_parse and old_presence != presence:
# self.dispatch('presence_update', old_member, member)
# user_update = member._user._update_self(user)
# if user_update:
# self.dispatch('user_update', user_update[0], user_update[1])
# if should_parse and dispatch:
# self.dispatch('member_update', old_member, member)
# disregard.append(member)
# else:
# member = Member(data=mdata, guild=guild, state=self)
# if mdata.get('presence') is not None:
# member._presence_update(mdata['presence'], empty_tuple)
# to_add.append(member)
# guild._member_list.insert(index, member) if should_parse else None
# elif opdata['op'] == 'UPDATE' and should_parse:
# item = opdata['item']
# if 'group' in item: # Hoisted role
# continue
# mdata = item['member']
# user = mdata['user']
# user_id = int(user['id'])
# member = guild.get_member(user_id)
# if member is not None:
# old_member = Member._copy(member)
# dispatch = bool(member._update(mdata))
# if mdata.get('presence') is not None:
# pdata = mdata['presence']
# presence = self.get_presence(user_id, guild.id)
# if presence is not None:
# old_presence = Presence._copy(presence)
# presence._update(pdata, self)
# else:
# old_presence = Presence._offline()
# presence = self.store_presence(user_id, self.create_presence(pdata), guild.id)
# old_member._presence = old_presence
# if should_parse and old_presence != presence:
# self.dispatch('presence_update', old_member, member)
# user_update = member._user._update_self(user)
# if user_update:
# self.dispatch('user_update', user_update[0], user_update[1])
# if should_parse and dispatch:
# self.dispatch('member_update', old_member, member)
# else:
# _log.debug(
# 'GUILD_MEMBER_LIST_UPDATE type UPDATE referencing an unknown member ID %s index %s in %s. Discarding.',
# user_id,
# opdata['index'],
# guild.id,
# )
# member = Member(data=mdata, guild=guild, state=self)
# if mdata.get('presence') is not None:
# self.store_presence(user_id, self.create_presence(mdata['presence']), guild.id)
# guild._member_list.insert(opdata['index'], member) # Race condition?
# elif opdata['op'] == 'DELETE' and should_parse:
# index = opdata['index']
# try:
# item = guild._member_list.pop(index)
# except IndexError:
# _log.debug(
# 'GUILD_MEMBER_LIST_UPDATE type DELETE referencing an unknown member index %s in %s. Discarding.',
# index,
# guild.id,
# )
# continue
# if item is not None:
# to_remove.append(item)
if request:
if request.chunk and not (
any(group['id'] == 'offline' for group in data['groups']) or data['member_count'] == data['online_count']
):
# The guild has offline members hidden
_log.debug(f'Detected guild {guild} with erroneous offline members.')
return
request.add_members(members)
# request.add_members(members + to_add)
# Attempt to detect Discord overriding the member list
if (
not request.chunk
and not request.manual_override
and data['online_count'] > 0
and guild._true_online_count < data['online_count']
and 'online' not in [group['id'] for group in data['groups']]
):
_log.debug(
f'Detected guild {guild} with manually overriden member list groups: online members not cached by the Gateway.'
)
request.handle_manual_override(guild._true_online_count)
else:
for member in members: # + to_add:
guild._add_member(member)
# if should_parse:
# actually_remove = [member for member in to_remove if member not in to_add and member not in disregard]
# actually_add = [member for member in to_add if member not in to_remove]
# for member in actually_remove:
# guild._remove_member(member)
# self.dispatch('member_remove', member)
# for member in actually_add:
# self.dispatch('member_join', member)
def parse_guild_application_command_index_update(self, data: gw.GuildApplicationCommandIndexUpdateEvent) -> None:
guild = self._get_guild(int(data['guild_id']))
if guild is None:
_log.debug(
'GUILD_APPLICATION_COMMAND_INDEX_UPDATE referencing an unknown guild ID: %s. Discarding.',
data['guild_id'],
)
return
self.dispatch('application_command_index_update', guild)
def parse_guild_emojis_update(self, data: gw.GuildEmojisUpdateEvent) -> None:
guild = self._get_guild(int(data['guild_id']))
if guild is None:
_log.debug('GUILD_EMOJIS_UPDATE referencing an unknown guild ID: %s. Discarding.', data['guild_id'])
return
before_emojis = guild.emojis
for emoji in before_emojis:
self._emojis.pop(emoji.id, None)
guild.emojis = tuple(map(lambda d: self.store_emoji(guild, d), data['emojis']))
self.dispatch('guild_emojis_update', guild, before_emojis, guild.emojis)
def parse_guild_stickers_update(self, data: gw.GuildStickersUpdateEvent) -> None:
guild = self._get_guild(int(data['guild_id']))
if guild is None:
_log.debug('GUILD_STICKERS_UPDATE referencing an unknown guild ID: %s. Discarding.', data['guild_id'])
return
before_stickers = guild.stickers
for emoji in before_stickers:
self._stickers.pop(emoji.id, None)
guild.stickers = tuple(map(lambda d: self.store_sticker(guild, d), data['stickers']))
self.dispatch('guild_stickers_update', guild, before_stickers, guild.stickers)
def parse_guild_audit_log_entry_create(self, data: gw.GuildAuditLogEntryCreate) -> None:
guild = self._get_guild(int(data['guild_id']))
if guild is None:
_log.debug('GUILD_AUDIT_LOG_ENTRY_CREATE referencing an unknown guild ID: %s. Discarding.', data['guild_id'])
return
entry = AuditLogEntry(
users=self._users,
automod_rules={},
webhooks={},
data=data,
guild=guild,
)
self.dispatch('audit_log_entry_create', entry)
# AutoMod events are not actually dispatched for user accounts...
def parse_auto_moderation_rule_create(self, data: AutoModerationRule) -> None:
guild = self._get_guild(int(data['guild_id']))
if guild is None:
_log.debug('AUTO_MODERATION_RULE_CREATE referencing an unknown guild ID: %s. Discarding.', data['guild_id'])
return
rule = AutoModRule(data=data, guild=guild, state=self)
self.dispatch('automod_rule_create', rule)
def parse_auto_moderation_rule_update(self, data: AutoModerationRule) -> None:
guild = self._get_guild(int(data['guild_id']))
if guild is None:
_log.debug('AUTO_MODERATION_RULE_UPDATE referencing an unknown guild ID: %s. Discarding.', data['guild_id'])
return
rule = AutoModRule(data=data, guild=guild, state=self)
self.dispatch('automod_rule_update', rule)
def parse_auto_moderation_rule_delete(self, data: AutoModerationRule) -> None:
guild = self._get_guild(int(data['guild_id']))
if guild is None:
_log.debug('AUTO_MODERATION_RULE_DELETE referencing an unknown guild ID: %s. Discarding.', data['guild_id'])
return
rule = AutoModRule(data=data, guild=guild, state=self)
self.dispatch('automod_rule_delete', rule)
def parse_auto_moderation_action_execution(self, data: AutoModerationActionExecution) -> None:
guild = self._get_guild(int(data['guild_id']))
if guild is None:
_log.debug('AUTO_MODERATION_ACTION_EXECUTION referencing an unknown guild ID: %s. Discarding.', data['guild_id'])
return
execution = AutoModAction(data=data, state=self)
self.dispatch('automod_action', execution)
def _get_create_guild(self, data: gw.GuildCreateEvent) -> Optional[Guild]:
guild = self._get_guild(int(data['id']))
unavailable = data.get('unavailable')
# Discord being Discord sometimes sends a GUILD_CREATE after subscribing to a guild
# In this case, we just update it and return None to avoid a double dispatch
if guild is not None:
guild._from_data(data)
if unavailable is not False:
return
return guild or self._add_guild_from_data(data)
def _chunk_timeout(self, guild: Guild) -> float:
return max(5.0, (guild._member_count or 0) / 10000)
def is_guild_evicted(self, guild: Guild) -> bool:
return guild.id not in self._guilds
async def assert_guild_presence_count(self, guild: Guild):
if not guild._offline_members_hidden or guild._presence_count:
return
channel = None
for channel in guild.channels:
if channel.permissions_for(guild.default_role).read_messages and channel.permissions_for(guild.me).read_messages: # type: ignore
break
else:
raise RuntimeError('No channels viewable')
requests: Dict[Snowflake, List[Tuple[int, int]]] = {str(channel.id): [(0, 99)]}
def predicate(data):
return int(data['guild_id']) == guild.id
await self.subscriptions.subscribe_to_channels(guild, requests)
try:
await asyncio.wait_for(self.ws.wait_for('GUILD_MEMBER_LIST_UPDATE', predicate), timeout=10)
except asyncio.TimeoutError:
pass
if not guild._presence_count:
data = await self.http.get_guild_preview(guild.id)
guild._presence_count = data['approximate_presence_count']
@overload
async def scrape_guild(
self,
guild: Guild,
*,
wait: Literal[True] = ...,
cache: bool,
force_scraping: bool = ...,
chunk: bool = ...,
channels: List[abcSnowflake] = ...,
delay: Union[int, float] = ...,
) -> List[Member]:
...
@overload
async def scrape_guild(
self,
guild: Guild,
*,
wait: Literal[False] = ...,
cache: bool,
force_scraping: bool = ...,
chunk: bool = ...,
channels: List[abcSnowflake] = ...,
delay: Union[int, float] = ...,
) -> asyncio.Future[List[Member]]:
...
async def scrape_guild(
self,
guild: Guild,
*,
wait: bool = True,
cache: bool,
force_scraping: bool = False,
chunk: bool = False,
channels: List[abcSnowflake] = MISSING,
delay: Union[int, float] = MISSING,
) -> Union[List[Member], asyncio.Future[List[Member]]]:
if not guild.me:
await guild.query_members(user_ids=[self.self_id], cache=True) # type: ignore # self_id is always present here
if (
not chunk
and not force_scraping
and guild.me
and any(
{
guild.me.guild_permissions.kick_members,
guild.me.guild_permissions.ban_members,
guild.me.guild_permissions.manage_roles,
}
)
):
request = self._chunk_requests.get(guild.id)
if request is None:
self._chunk_requests[guild.id] = request = ChunkRequest(guild.id, self.loop, self._get_guild, cache=cache)
await self.chunker([guild.id], nonce=request.nonce)
else:
if not chunk:
await self.assert_guild_presence_count(guild)
request = self._scrape_requests.get(guild.id)
if request is None:
self._scrape_requests[guild.id] = request = MemberSidebar(
guild, channels, chunk=chunk, cache=cache, loop=self.loop, delay=delay or 0
)
if not request.channels:
del self._scrape_requests[guild.id]
if chunk:
raise ClientException('Guild cannot be chunked: no channels viewable by @everyone')
raise ClientException('Failed to automatically choose channels; please specify them manually')
request.start()
if wait:
return await request.wait()
return request.get_future()
@overload
async def chunk_guild(
self, guild: Guild, *, nonce: Optional[str] = ..., wait: Literal[True] = ..., cache: Optional[bool] = ...
) -> List[Member]:
...
@overload
async def chunk_guild(
self, guild: Guild, *, nonce: Optional[str] = ..., wait: Literal[False] = ..., cache: Optional[bool] = ...
) -> asyncio.Future[List[Member]]:
...
async def chunk_guild(
self, guild: Guild, *, nonce: Optional[str] = None, wait: bool = True, cache: Optional[bool] = None
) -> Union[List[Member], asyncio.Future[List[Member]]]:
cache = cache or self.member_cache_flags.joined
request = self._chunk_requests.get(guild.id)
if request is None:
self._chunk_requests[guild.id] = request = ChunkRequest(
guild.id, self.loop, self._get_guild, cache=cache, nonce=nonce
)
if not nonce:
await self.chunker([guild.id], nonce=request.nonce)
if wait:
return await request.wait()
return request.get_future()
async def _chunk_and_dispatch(self, guild: Guild, chunk: bool, unavailable: Optional[bool]) -> None:
timeout = self._chunk_timeout(guild)
if chunk:
coro = None
if await self._can_chunk_guild(guild):
coro = self.chunk_guild(guild)
elif not guild._offline_members_hidden:
try:
coro = await self.scrape_guild(guild, wait=False, cache=True, chunk=True)
except ClientException:
pass
if coro is not None:
try:
await asyncio.wait_for(coro, timeout=timeout)
except asyncio.TimeoutError:
_log.warning('Somehow timed out waiting for chunks for guild %s.', guild.id)
except (ClientException, InvalidData):
pass
if unavailable is False:
self.dispatch('guild_available', guild)
else:
self.dispatch('guild_join', guild)
def parse_guild_create(self, data: gw.GuildCreateEvent):
if 'properties' in data:
data.update(data.pop('properties')) # type: ignore
guild = self._get_create_guild(data)
if guild is None:
return
if self._subscribe_guilds and not guild.unavailable:
asyncio.ensure_future(self.subscribe_guild(guild), loop=self.loop)
# Chunk if needed
needs_chunking = self._guild_needs_chunking(guild)
asyncio.ensure_future(self._chunk_and_dispatch(guild, needs_chunking, data.get('unavailable')), loop=self.loop)
def parse_guild_update(self, data: gw.GuildUpdateEvent) -> None:
guild = self._get_guild(int(data['id']))
if guild is not None:
old_guild = copy.copy(guild)
guild._from_data(data)
self.dispatch('guild_update', old_guild, guild)
else:
_log.debug('GUILD_UPDATE referencing an unknown guild ID: %s. Discarding.', data['id'])
def parse_guild_delete(self, data: gw.GuildDeleteEvent) -> None:
guild = self._get_guild(int(data['id']))
if guild is None:
_log.debug('GUILD_DELETE referencing an unknown guild ID: %s. Discarding.', data['id'])
return
if data.get('unavailable', False):
# GUILD_DELETE with unavailable being True means that the
# guild that was available is now currently unavailable
guild.unavailable = True
self.dispatch('guild_unavailable', guild)
return
# Cleanup the message cache
if self._messages is not None:
self._messages: Optional[Deque[Message]] = deque(
(msg for msg in self._messages if msg.guild != guild), maxlen=self.max_messages
)
self._remove_guild(guild)
self.dispatch('guild_remove', guild)
def parse_guild_feature_ack(self, data: gw.NonChannelAckEvent) -> None:
self.read_state_version = data.get('version', self.read_state_version)
guild = self._get_guild(int(data['resource_id']))
if guild is None:
_log.debug('GUILD_FEATURE_ACK referencing an unknown guild ID: %s. Discarding.', data['resource_id'])
return
raw = RawGuildFeatureAckEvent(data, self)
read_state = self.get_read_state(guild.id, raw.type)
read_state.last_acked_id = int(data['entity_id'])
self.dispatch('guild_feature_ack', raw)
# Rich events here
if read_state.type == ReadStateType.scheduled_events:
event = guild.get_scheduled_event(read_state.last_acked_id)
if event is not None:
self.dispatch('scheduled_event_ack', event)
def parse_guild_ban_add(self, data: gw.GuildBanAddEvent) -> None:
guild = self._get_guild(int(data['guild_id']))
if guild is not None:
try:
user = User(data=data['user'], state=self)
except KeyError:
pass
else:
member = guild.get_member(user.id) or user
self.dispatch('member_ban', guild, member)
def parse_guild_ban_remove(self, data: gw.GuildBanRemoveEvent) -> None:
guild = self._get_guild(int(data['guild_id']))
if guild is not None and 'user' in data:
user = self.store_user(data['user'])
self.dispatch('member_unban', guild, user)
def parse_guild_role_create(self, data: gw.GuildRoleCreateEvent) -> None:
guild = self._get_guild(int(data['guild_id']))
if guild is None:
_log.debug('GUILD_ROLE_CREATE referencing an unknown guild ID: %s. Discarding.', data['guild_id'])
return
role_data = data['role']
role = Role(guild=guild, data=role_data, state=self)
guild._add_role(role)
self.dispatch('guild_role_create', role)
def parse_guild_role_delete(self, data: gw.GuildRoleDeleteEvent) -> None:
guild = self._get_guild(int(data['guild_id']))
if guild is not None:
role_id = int(data['role_id'])
try:
role = guild._remove_role(role_id)
except KeyError:
return
else:
self.dispatch('guild_role_delete', role)
else:
_log.debug('GUILD_ROLE_DELETE referencing an unknown guild ID: %s. Discarding.', data['guild_id'])
def parse_guild_role_update(self, data: gw.GuildRoleUpdateEvent) -> None:
guild = self._get_guild(int(data['guild_id']))
if guild is not None:
role_data = data['role']
role_id = int(role_data['id'])
role = guild.get_role(role_id)
if role is not None:
old_role = copy.copy(role)
role._update(role_data)
self.dispatch('guild_role_update', old_role, role)
else:
_log.debug('GUILD_ROLE_UPDATE referencing an unknown guild ID: %s. Discarding.', data['guild_id'])
def parse_guild_members_chunk(self, data: gw.GuildMembersChunkEvent) -> None:
guild_id = int(data['guild_id'])
guild = self._get_guild(guild_id)
presences = data.get('presences', [])
if guild is None:
return
members = [Member(guild=guild, data=member, state=self) for member in data.get('members', [])]
_log.debug('Processed a chunk for %s members in guild ID %s.', len(members), guild_id)
if presences:
empty_tuple = ()
member_dict: Dict[Snowflake, Member] = {str(member.id): member for member in members}
for presence in presences:
user = presence['user']
member_id = user['id']
member = member_dict.get(member_id)
if member is not None:
member._presence_update(presence, empty_tuple)
complete = data.get('chunk_index', 0) + 1 == data.get('chunk_count')
self.process_chunk_requests(guild_id, data.get('nonce'), members, complete)
def parse_guild_integrations_update(self, data: gw.GuildIntegrationsUpdateEvent) -> None:
guild = self._get_guild(int(data['guild_id']))
if guild is not None:
self.dispatch('guild_integrations_update', guild)
else:
_log.debug('GUILD_INTEGRATIONS_UPDATE referencing an unknown guild ID: %s. Discarding.', data['guild_id'])
def parse_integration_create(self, data: gw.IntegrationCreateEvent) -> None:
guild_id = int(data['guild_id'])
guild = self._get_guild(guild_id)
if guild is not None:
cls, _ = _integration_factory(data['type'])
integration = cls(data=data, guild=guild)
self.dispatch('integration_create', integration)
else:
_log.debug('INTEGRATION_CREATE referencing an unknown guild ID: %s. Discarding.', guild_id)
def parse_integration_update(self, data: gw.IntegrationUpdateEvent) -> None:
guild_id = int(data['guild_id'])
guild = self._get_guild(guild_id)
if guild is not None:
cls, _ = _integration_factory(data['type'])
integration = cls(data=data, guild=guild)
self.dispatch('integration_update', integration)
else:
_log.debug('INTEGRATION_UPDATE referencing an unknown guild ID: %s. Discarding.', guild_id)
def parse_integration_delete(self, data: gw.IntegrationDeleteEvent) -> None:
guild_id = int(data['guild_id'])
guild = self._get_guild(guild_id)
if guild is not None:
raw = RawIntegrationDeleteEvent(data)
self.dispatch('raw_integration_delete', raw)
else:
_log.debug('INTEGRATION_DELETE referencing an unknown guild ID: %s. Discarding.', guild_id)
def parse_webhooks_update(self, data: gw.WebhooksUpdateEvent) -> None:
guild = self._get_guild(int(data['guild_id']))
if guild is None:
_log.debug('WEBHOOKS_UPDATE referencing an unknown guild ID: %s. Discarding', data['guild_id'])
return
channel_id = utils._get_as_snowflake(data, 'channel_id')
channel = guild.get_channel(channel_id) # type: ignore # None is okay here
if channel is not None:
self.dispatch('webhooks_update', channel)
else:
_log.debug('WEBHOOKS_UPDATE referencing an unknown channel ID: %s. Discarding.', data['channel_id'])
def parse_stage_instance_create(self, data: gw.StageInstanceCreateEvent) -> None:
guild = self._get_guild(int(data['guild_id']))
if guild is not None:
stage_instance = StageInstance(guild=guild, state=self, data=data)
guild._stage_instances[stage_instance.id] = stage_instance
self.dispatch('stage_instance_create', stage_instance)
else:
_log.debug('STAGE_INSTANCE_CREATE referencing unknown guild ID: %s. Discarding.', data['guild_id'])
def parse_stage_instance_update(self, data: gw.StageInstanceUpdateEvent) -> None:
guild = self._get_guild(int(data['guild_id']))
if guild is not None:
stage_instance = guild._stage_instances.get(int(data['id']))
if stage_instance is not None:
old_stage_instance = copy.copy(stage_instance)
stage_instance._update(data)
self.dispatch('stage_instance_update', old_stage_instance, stage_instance)
else:
_log.debug('STAGE_INSTANCE_UPDATE referencing unknown stage instance ID: %s. Discarding.', data['id'])
else:
_log.debug('STAGE_INSTANCE_UPDATE referencing unknown guild ID: %s. Discarding.', data['guild_id'])
def parse_stage_instance_delete(self, data: gw.StageInstanceDeleteEvent) -> None:
guild = self._get_guild(int(data['guild_id']))
if guild is not None:
try:
stage_instance = guild._stage_instances.pop(int(data['id']))
except KeyError:
pass
else:
self.dispatch('stage_instance_delete', stage_instance)
else:
_log.debug('STAGE_INSTANCE_DELETE referencing unknown guild ID: %s. Discarding.', data['guild_id'])
def parse_guild_scheduled_event_create(self, data: gw.GuildScheduledEventCreateEvent) -> None:
guild = self._get_guild(int(data['guild_id']))
if guild is not None:
scheduled_event = ScheduledEvent(state=self, data=data)
guild._scheduled_events[scheduled_event.id] = scheduled_event
self.dispatch('scheduled_event_create', scheduled_event)
read_state = self.get_read_state(guild.id, ReadStateType.scheduled_events)
if scheduled_event.creator_id == self.self_id:
# Implicitly ack created events
read_state.last_acked_id = scheduled_event.id
if not guild.notification_settings.mute_scheduled_events:
# Increment badge count if we're not muted
read_state.badge_count += 1
else:
_log.debug('SCHEDULED_EVENT_CREATE referencing unknown guild ID: %s. Discarding.', data['guild_id'])
def parse_guild_scheduled_event_update(self, data: gw.GuildScheduledEventUpdateEvent) -> None:
guild = self._get_guild(int(data['guild_id']))
if guild is not None:
scheduled_event = guild._scheduled_events.get(int(data['id']))
if scheduled_event is not None:
old_scheduled_event = copy.copy(scheduled_event)
scheduled_event._update(data)
self.dispatch('scheduled_event_update', old_scheduled_event, scheduled_event)
else:
_log.debug('SCHEDULED_EVENT_UPDATE referencing unknown scheduled event ID: %s. Discarding.', data['id'])
else:
_log.debug('SCHEDULED_EVENT_UPDATE referencing unknown guild ID: %s. Discarding.', data['guild_id'])
def parse_guild_scheduled_event_delete(self, data: gw.GuildScheduledEventDeleteEvent) -> None:
guild = self._get_guild(int(data['guild_id']))
if guild is not None:
try:
scheduled_event = guild._scheduled_events.pop(int(data['id']))
except KeyError:
pass
else:
self.dispatch('scheduled_event_delete', scheduled_event)
else:
_log.debug('SCHEDULED_EVENT_DELETE referencing unknown guild ID: %s. Discarding.', data['guild_id'])
def parse_guild_scheduled_event_user_add(self, data: gw.GuildScheduledEventUserAdd) -> None:
guild = self._get_guild(int(data['guild_id']))
if guild is not None:
scheduled_event = guild._scheduled_events.get(int(data['guild_scheduled_event_id']))
if scheduled_event is not None:
user_id = int(data['user_id'])
user = self.get_user(user_id)
if user is not None:
scheduled_event._add_user(user)
self.dispatch('scheduled_event_user_add', scheduled_event, user)
self.dispatch('raw_scheduled_event_user_add', scheduled_event, user_id)
else:
_log.debug(
'SCHEDULED_EVENT_USER_ADD referencing unknown scheduled event ID: %s. Discarding.',
data['guild_scheduled_event_id'],
)
else:
_log.debug('SCHEDULED_EVENT_USER_ADD referencing unknown guild ID: %s. Discarding.', data['guild_id'])
def parse_guild_scheduled_event_user_remove(self, data: gw.GuildScheduledEventUserRemove) -> None:
guild = self._get_guild(int(data['guild_id']))
if guild is not None:
scheduled_event = guild._scheduled_events.get(int(data['guild_scheduled_event_id']))
if scheduled_event is not None:
user_id = int(data['user_id'])
user = self.get_user(user_id)
if user is not None:
scheduled_event._pop_user(user.id)
self.dispatch('scheduled_event_user_remove', scheduled_event, user)
self.dispatch('raw_scheduled_event_user_remove', scheduled_event, user_id)
else:
_log.debug(
'SCHEDULED_EVENT_USER_REMOVE referencing unknown scheduled event ID: %s. Discarding.',
data['guild_scheduled_event_id'],
)
else:
_log.debug('SCHEDULED_EVENT_USER_REMOVE referencing unknown guild ID: %s. Discarding.', data['guild_id'])
def parse_call_create(self, data: gw.CallCreateEvent) -> None:
channel_id = int(data['channel_id'])
channel = self._get_private_channel(channel_id)
if channel is None:
_log.debug('CALL_CREATE referencing unknown channel ID: %s. Discarding.', data['channel_id'])
return
call = self._calls.get(channel_id)
if call is not None:
# Should only happen for unavailable calls
old_call = copy.copy(call)
call._update(data)
self.dispatch('call_update', old_call, call)
message = self._get_message(int(data['message_id']))
call = channel._add_call(data=data, state=self, message=message, channel=channel)
self._calls[channel.id] = call
self.dispatch('call_create', call)
def parse_call_update(self, data: gw.CallUpdateEvent) -> None:
call = self._calls.get(int(data['channel_id']))
if call is None:
_log.debug('CALL_UPDATE referencing unknown call (channel ID: %s). Discarding.', data['channel_id'])
return
old_call = copy.copy(call)
call._update(data)
self.dispatch('call_update', old_call, call)
def parse_call_delete(self, data: gw.CallDeleteEvent) -> None:
call = self._calls.pop(int(data['channel_id']), None)
if call is not None:
if data.get('unavailable'):
old_call = copy.copy(call)
call.unavailable = True
self.dispatch('call_update', old_call, call)
return
call._delete()
self._call_message_cache.pop(call._message_id, None)
self.dispatch('call_delete', call)
def parse_voice_state_update(self, data: gw.VoiceStateUpdateEvent) -> None:
guild_id = utils._get_as_snowflake(data, 'guild_id')
guild = self._get_guild(guild_id)
channel_id = utils._get_as_snowflake(data, 'channel_id')
flags = self.member_cache_flags
self_id = self.self_id
if guild_id is not None and guild is None:
_log.debug('VOICE_STATE_UPDATE referencing unknown guild ID: %s. Discarding.', guild_id)
return
if int(data['user_id']) == self_id:
voice = self._get_voice_client(guild.id if guild else self_id)
if voice is not None:
coro = voice.on_voice_state_update(data)
asyncio.create_task(logging_coroutine(coro, info='Voice Protocol voice state update handler'))
if guild is not None:
member, before, after = guild._update_voice_state(data, channel_id)
if member is not None:
if flags.voice:
if channel_id is None and flags._voice_only and member.id != self_id:
guild._remove_member(member)
elif channel_id is not None:
guild._add_member(member)
self.dispatch('voice_state_update', member, before, after)
else:
_log.debug('VOICE_STATE_UPDATE referencing an unknown member ID: %s. Discarding.', data['user_id'])
else:
user, before, after = self._update_voice_state(data, channel_id)
self.dispatch('voice_state_update', user, before, after)
def parse_voice_server_update(self, data: gw.VoiceServerUpdateEvent) -> None:
key_id = utils._get_as_snowflake(data, 'guild_id')
if key_id is None:
key_id = self.self_id
vc = self._get_voice_client(key_id)
if vc is not None:
coro = vc.on_voice_server_update(data)
asyncio.create_task(logging_coroutine(coro, info='Voice Protocol voice server update handler'))
def parse_typing_start(self, data: gw.TypingStartEvent) -> None:
channel, guild = self._get_guild_channel(data)
if channel is not None:
member = None
user_id = int(data['user_id'])
if isinstance(channel, DMChannel):
member = channel.recipient
elif isinstance(channel, (Thread, TextChannel)) and guild is not None:
member = guild.get_member(user_id)
if member is None:
member_data = data.get('member')
if member_data:
member = Member(data=member_data, state=self, guild=guild)
elif isinstance(channel, GroupChannel):
member = utils.find(lambda x: x.id == user_id, channel.recipients)
if member is not None:
timestamp = datetime.datetime.fromtimestamp(data['timestamp'], tz=datetime.timezone.utc)
self.dispatch('typing', channel, member, timestamp)
def parse_relationship_add(self, data: gw.RelationshipAddEvent) -> None:
key = int(data['id'])
new = self._relationships.get(key)
if new is None:
relationship = Relationship(state=self, data=data)
self._relationships[key] = relationship
self.dispatch('relationship_add', relationship)
else:
old = copy.copy(new)
new._update(data)
self.dispatch('relationship_update', old, new)
def parse_relationship_remove(self, data: gw.RelationshipEvent) -> None:
key = int(data['id'])
try:
old = self._relationships.pop(key)
except KeyError:
_log.warning('RELATIONSHIP_REMOVE referencing unknown relationship ID: %s. Discarding.', key)
else:
self.dispatch('relationship_remove', old)
def parse_relationship_update(self, data: gw.RelationshipEvent) -> None:
key = int(data['id'])
new = self._relationships.get(key)
if new is None:
relationship = Relationship(state=self, data=data) # type: ignore
self._relationships[key] = relationship
else:
old = copy.copy(new)
new._update(data)
self.dispatch('relationship_update', old, new)
def parse_friend_suggestion_create(self, data: gw.FriendSuggestionCreateEvent):
self.friend_suggestion_count += 1
self.dispatch('friend_suggestion_add', FriendSuggestion(state=self, data=data))
def parse_friend_suggestion_delete(self, data: gw.FriendSuggestionDeleteEvent):
self.friend_suggestion_count -= 1
user_id = int(data['suggested_user_id'])
user = self.get_user(user_id)
if user:
self.dispatch('friend_suggestion_remove', user)
self.dispatch('raw_friend_suggestion_remove', user_id)
def parse_interaction_create(self, data: gw.InteractionEvent) -> None:
if 'nonce' not in data: # Sometimes interactions seem to be missing the nonce
return
type, name, channel = self._interaction_cache.pop(data['nonce'], (0, None, None))
i = Interaction._from_self(channel, type=type, user=self.user, name=name, **data) # type: ignore # self.user is always present here
self._interactions[i.id] = i
self.dispatch('interaction', i)
def parse_interaction_success(self, data: gw.InteractionEvent) -> None:
id = int(data['id'])
i = self._interactions.get(id, None)
if i is None:
_log.warning('INTERACTION_SUCCESS referencing an unknown interaction ID: %s. Discarding.', id)
return
i.successful = True
self.dispatch('interaction_finish', i)
def parse_interaction_failed(self, data: gw.InteractionEvent) -> None:
id = int(data['id'])
i = self._interactions.pop(id, None)
if i is None:
_log.warning('INTERACTION_FAILED referencing an unknown interaction ID: %s. Discarding.', id)
return
i.successful = False
self.dispatch('interaction_finish', i)
def parse_interaction_modal_create(self, data: gw.InteractionModalCreateEvent) -> None:
id = int(data['id'])
interaction = self._interactions.pop(id, None)
if interaction is not None:
modal = Modal(data=data, interaction=interaction)
interaction.modal = modal
self.dispatch('modal', modal)
# Silence "unknown event" warnings for events parsed elsewhere
parse_nothing = lambda *_: None
# parse_guild_application_commands_update = parse_nothing # Grabbed directly in command iterators
def parse_message_poll_vote_add(self, data: gw.PollVoteActionEvent) -> None:
raw = RawPollVoteActionEvent(data)
self.dispatch('raw_poll_vote_add', raw)
message = self._get_message(raw.message_id)
guild = self._get_guild(raw.guild_id)
if guild:
user = guild.get_member(raw.user_id)
else:
user = self.get_user(raw.user_id)
if message and user:
poll = self._update_poll_counts(message, raw.answer_id, True, raw.user_id == self.self_id)
if poll:
self.dispatch('poll_vote_add', user, poll.get_answer(raw.answer_id))
def parse_message_poll_vote_remove(self, data: gw.PollVoteActionEvent) -> None:
raw = RawPollVoteActionEvent(data)
self.dispatch('raw_poll_vote_remove', raw)
message = self._get_message(raw.message_id)
guild = self._get_guild(raw.guild_id)
if guild:
user = guild.get_member(raw.user_id)
else:
user = self.get_user(raw.user_id)
if message and user:
poll = self._update_poll_counts(message, raw.answer_id, False, raw.user_id == self.self_id)
if poll:
self.dispatch('poll_vote_remove', user, poll.get_answer(raw.answer_id))
def _get_reaction_user(self, channel: MessageableChannel, user_id: int) -> Optional[Union[User, Member]]:
if isinstance(channel, (TextChannel, Thread, VoiceChannel, StageChannel)):
return channel.guild.get_member(user_id)
return self.get_user(user_id)
def get_reaction_emoji(self, data: PartialEmojiPayload) -> Union[Emoji, PartialEmoji, str]:
emoji_id = utils._get_as_snowflake(data, 'id')
if not emoji_id:
# the name key will be a str
return data['name'] # type: ignore
try:
return self._emojis[emoji_id]
except KeyError:
return PartialEmoji.with_state(
self, animated=data.get('animated', False), id=emoji_id, name=data['name'] # type: ignore
)
def _upgrade_partial_emoji(self, emoji: PartialEmoji) -> Union[Emoji, PartialEmoji, str]:
emoji_id = emoji.id
if not emoji_id:
return emoji.name
try:
return self._emojis[emoji_id]
except KeyError:
return emoji
def get_channel(self, id: Optional[int]) -> Optional[Union[Channel, Thread]]:
if id is None:
return None
pm = self._get_private_channel(id)
if pm is not None:
return pm
for guild in self.guilds:
channel = guild._resolve_channel(id)
if channel is not None:
return channel
def _get_or_create_partial_messageable(self, id: Optional[int]) -> Optional[Union[Channel, Thread]]:
if id is None:
return None
return self.get_channel(id) or PartialMessageable(state=self, id=id)
def create_message(
self,
*,
channel: MessageableChannel,
data: MessagePayload,
search_result: Optional[MessageSearchResultPayload] = None,
) -> Message:
return Message(state=self, channel=channel, data=data, search_result=search_result)
def _update_message_references(self) -> None:
# self._messages won't be None when this is called
for msg in self._messages: # type: ignore
if not msg.guild:
continue
new_guild = self._get_guild(msg.guild.id)
if new_guild is not None and new_guild is not msg.guild:
channel_id = msg.channel.id
channel = new_guild._resolve_channel(channel_id) or PartialMessageable(
state=self, id=channel_id, guild_id=new_guild.id
)
msg._rebind_cached_references(new_guild, channel)
def create_integration_application(self, data: IntegrationApplicationPayload) -> IntegrationApplication:
return IntegrationApplication(state=self, data=data)
def default_guild_settings(self, guild_id: Optional[int]) -> GuildSettings:
return GuildSettings(data={'guild_id': guild_id}, state=self) # type: ignore
def default_channel_settings(self, guild_id: Optional[int], channel_id: int) -> ChannelSettings:
return ChannelSettings(guild_id, data={'channel_id': channel_id}, state=self) # type: ignore
def create_implicit_relationship(self, user: User) -> Relationship:
relationship = self._relationships.get(user.id)
if relationship is not None:
if relationship.type.value == 0:
relationship.type = RelationshipType.implicit
else:
relationship = Relationship._from_implicit(state=self, user=user)
self._relationships[relationship.id] = relationship
return relationship
@property
def all_session(self) -> Optional[Session]:
return self._sessions.get('all')
@property
def current_session(self) -> Optional[Session]:
return self._sessions.get(self.session_id) # type: ignore
@utils.cached_property
def client_presence(self) -> FakeClientPresence:
return FakeClientPresence(self)
def create_presence(self, data: gw.BasePresenceUpdate) -> Presence:
return Presence(data, self)
def create_offline_presence(self) -> Presence:
return Presence._offline()
def get_presence(self, user_id: int, guild_id: Optional[int] = None) -> Optional[Presence]:
if user_id == self.self_id:
# Our own presence is unified
return self.client_presence
if guild_id is not None:
guild = self._guild_presences.get(guild_id)
if guild is not None:
return guild.get(user_id)
return
return self._presences.get(user_id)
def remove_presence(self, user_id: int, guild_id: Optional[int] = None) -> None:
if guild_id is not None:
guild = self._guild_presences.get(guild_id)
if guild is not None:
guild.pop(user_id, None)
else:
self._presences.pop(user_id, None)
def store_presence(self, user_id: int, presence: Presence, guild_id: Optional[int] = None) -> Presence:
if presence.client_status.status == Status.offline.value and not presence.activities:
# We don't store empty presences
self.remove_presence(user_id, guild_id)
return presence
if user_id == self.self_id:
# We don't store our own presence
return presence
if guild_id is not None:
guild = self._guild_presences.get(guild_id)
if guild is None:
guild = self._guild_presences[guild_id] = {}
guild[user_id] = presence
else:
self._presences[user_id] = presence
return presence
@overload
def get_read_state(self, id: int, type: ReadStateType = ..., *, if_exists: Literal[False] = ...) -> ReadState:
...
@overload
def get_read_state(self, id: int, type: ReadStateType = ..., *, if_exists: Literal[True]) -> Optional[ReadState]:
...
def get_read_state(
self, id: int, type: ReadStateType = ReadStateType.channel, *, if_exists: bool = False
) -> Optional[ReadState]:
try:
return self._read_states[type.value][id]
except KeyError:
if not if_exists:
# Create and store a default read state
state = ReadState.default(id, type, state=self)
self.store_read_state(state)
return state
def remove_read_state(self, read_state: ReadState) -> None:
try:
group = self._read_states[read_state.type.value]
except KeyError:
return
group.pop(read_state.id, None)
def store_read_state(self, read_state: ReadState):
try:
group = self._read_states[read_state.type.value]
except KeyError:
group = self._read_states[read_state.type.value] = {}
group[read_state.id] = read_state
@utils.cached_property
def premium_subscriptions_application(self) -> PartialApplication:
# Hardcoded application for premium subscriptions, highly unlikely to change
return PartialApplication(
state=self,
data={
'id': 521842831262875670,
'name': 'Nitro',
'icon': None,
'description': '',
'type': None,
'is_monetized': False,
'is_verified': False,
'is_discoverable': False,
'hook': True,
'storefront_available': False,
'verify_key': '93661a9eefe452d12f51e129e8d9340e7ca53a770158c0ec7970e701534b7420',
'flags': 0,
},
)
@utils.cached_property
def premium_subscriptions_sku_ids(self) -> Dict[str, Snowflake]:
return {
'none': 628379670982688768,
'basic': 978380684370378762,
'legacy': 521842865731534868,
'classic': 521846918637420545,
'full': 521847234246082599,
'guild': 590663762298667008,
}