From 527329be61c3f2d8cc53d1d0b05dd51211a5155b Mon Sep 17 00:00:00 2001 From: dolfies Date: Sun, 2 Apr 2023 20:06:50 -0400 Subject: [PATCH] Update context/super property management --- discord/http.py | 110 ++++++++++---------- discord/tracking.py | 238 ++++++++++++++------------------------------ discord/utils.py | 27 ++++- 3 files changed, 148 insertions(+), 227 deletions(-) diff --git a/discord/http.py b/discord/http.py index 32a2cd8bb..b6b0f625d 100644 --- a/discord/http.py +++ b/discord/http.py @@ -25,8 +25,6 @@ DEALINGS IN THE SOFTWARE. from __future__ import annotations import asyncio -from base64 import b64encode -import json import logging from random import choice, choices import string @@ -527,7 +525,6 @@ class HTTPClient: self.captcha_handler: Optional[CaptchaHandler] = captcha_handler self.max_ratelimit_timeout: Optional[float] = max(30.0, max_ratelimit_timeout) if max_ratelimit_timeout else None - self.user_agent: str = MISSING self.super_properties: Dict[str, Any] = {} self.encoded_super_properties: str = MISSING self._started: bool = False @@ -552,26 +549,8 @@ class HTTPClient: connector=self.connector, trace_configs=None if self.http_trace is None else [self.http_trace], ) - self.user_agent, self.browser_version, self.client_build_number = ua, bv, bn = await utils._get_info(session) - _log.info('Found user agent %s (%s), build number %s.', ua, bv, bn) - self.super_properties = sp = { - 'os': 'Windows', - 'browser': 'Chrome', - 'device': '', - 'browser_user_agent': ua, - 'browser_version': bv, - 'os_version': '10', - 'referrer': '', - 'referring_domain': '', - 'referrer_current': '', - 'referring_domain_current': '', - 'release_channel': 'stable', - 'system_locale': 'en-US', - 'client_build_number': bn, - 'client_event_source': None, - 'design_id': 0, - } - self.encoded_super_properties = b64encode(json.dumps(sp).encode()).decode('utf-8') + self.super_properties, self.encoded_super_properties = sp, _ = await utils._get_info(session) + _log.info('Found user agent %s, build number %s.', sp.get('browser_user_agent'), sp.get('client_build_number')) if self.captcha_handler is not None: await self.captcha_handler.startup() @@ -605,6 +584,14 @@ class HTTPClient: return await self.__session.ws_connect(url, **kwargs) + @property + def browser_version(self) -> str: + return self.super_properties['browser_version'] + + @property + def user_agent(self) -> str: + return self.super_properties['browser_user_agent'] + def _try_clear_expired_ratelimits(self) -> None: if len(self._buckets) < 256: return @@ -1009,7 +996,7 @@ class HTTPClient: payload = { 'recipients': recipients, } - props = ContextProperties._from_new_group_dm() # New Group DM button + props = ContextProperties.from_new_group_dm() # New Group DM button return self.request(Route('POST', '/users/@me/channels'), json=payload, context_properties=props) @@ -1017,9 +1004,12 @@ class HTTPClient: payload = None if nick: payload = {'nick': nick} + props = ContextProperties.from_add_friends_to_dm() return self.request( - Route('PUT', '/channels/{channel_id}/recipients/{user_id}', channel_id=channel_id, user_id=user_id), json=payload + Route('PUT', '/channels/{channel_id}/recipients/{user_id}', channel_id=channel_id, user_id=user_id), + json=payload, + context_properties=props, ) def remove_group_recipient(self, channel_id: Snowflake, user_id: Snowflake) -> Response[None]: @@ -1034,7 +1024,7 @@ class HTTPClient: payload = { 'recipients': [user_id], } - props = ContextProperties._empty() # {} + props = ContextProperties.empty() # {} return self.request(Route('POST', '/users/@me/channels'), json=payload, context_properties=props) @@ -1668,7 +1658,7 @@ class HTTPClient: params['location'] = 'Guild%20Discovery' if location is not MISSING: params['location'] = location - props = ContextProperties._empty() if lurker else ContextProperties._from_lurking() + props = ContextProperties.empty() if lurker else ContextProperties.from_lurking() return self.request( Route('PUT', '/guilds/{guild_id}/members/@me', guild_id=guild_id), @@ -2092,7 +2082,7 @@ class HTTPClient: message: Message = MISSING, ): # TODO: response type if message is not MISSING: # Invite Button Embed - props = ContextProperties._from_invite_embed( + props = ContextProperties.from_invite_button_embed( guild_id=getattr(message.guild, 'id', None), channel_id=message.channel.id, channel_type=getattr(message.channel, 'type', None), @@ -2101,12 +2091,12 @@ class HTTPClient: elif type is InviteType.guild or type is InviteType.group_dm: # Join Guild, Accept Invite Page props = choice( ( - ContextProperties._from_accept_invite_page, - ContextProperties._from_join_guild_popup, + ContextProperties.from_accept_invite_page, + ContextProperties.from_join_guild, ) )(guild_id=guild_id, channel_id=channel_id, channel_type=channel_type) else: # Accept Invite Page - props = ContextProperties._from_accept_invite_page( + props = ContextProperties.from_accept_invite_page( guild_id=guild_id, channel_id=channel_id, channel_type=channel_type ) return self.request(Route('POST', '/invites/{invite_id}', invite_id=invite_id), context_properties=props, json={}) @@ -2140,8 +2130,8 @@ class HTTPClient: payload['target_application_id'] = str(target_application_id) props = choice( ( - ContextProperties._from_guild_header, - ContextProperties._from_context_menu, + ContextProperties.from_guild_header, + ContextProperties.from_context_menu, ) )() @@ -2156,14 +2146,14 @@ class HTTPClient: payload = { 'max_age': max_age, } - props = ContextProperties._from_group_dm_invite() + props = ContextProperties.from_group_dm_invite_create() return self.request( Route('POST', '/channels/{channel_id}/invites', channel_id=channel_id), json=payload, context_properties=props ) def create_friend_invite(self) -> Response[invite.Invite]: - return self.request(Route('POST', '/users/@me/invites'), json={}, context_properties=ContextProperties._empty()) + return self.request(Route('POST', '/users/@me/invites'), json={}, context_properties=ContextProperties.empty()) def get_invite( self, @@ -2192,13 +2182,13 @@ class HTTPClient: return self.request(Route('GET', '/channels/{channel_id}/invites', channel_id=channel_id)) def get_friend_invites(self) -> Response[List[invite.Invite]]: - return self.request(Route('GET', '/users/@me/invites'), context_properties=ContextProperties._empty()) + return self.request(Route('GET', '/users/@me/invites'), context_properties=ContextProperties.empty()) def delete_invite(self, invite_id: str, *, reason: Optional[str] = None) -> Response[invite.Invite]: return self.request(Route('DELETE', '/invites/{invite_id}', invite_id=invite_id), reason=reason) def delete_friend_invites(self) -> Response[List[invite.Invite]]: - return self.request(Route('DELETE', '/users/@me/invites'), context_properties=ContextProperties._empty()) + return self.request(Route('DELETE', '/users/@me/invites'), context_properties=ContextProperties.empty()) # Role management @@ -2560,9 +2550,9 @@ class HTTPClient: if action is RelationshipAction.deny_request: # User Profile, Friends, DM Channel props = choice( ( - ContextProperties._from_friends_page, - ContextProperties._from_user_profile, - ContextProperties._from_dm_channel, + ContextProperties.from_friends, + ContextProperties.from_user_profile, + ContextProperties.from_dm_channel, ) )() elif action in ( @@ -2571,16 +2561,16 @@ class HTTPClient: ): # Friends, ContextMenu, User Profile, DM Channel props = choice( ( - ContextProperties._from_contextmenu, - ContextProperties._from_user_profile, - ContextProperties._from_friends_page, - ContextProperties._from_dm_channel, + ContextProperties.from_contextmenu, + ContextProperties.from_user_profile, + ContextProperties.from_friends, + ContextProperties.from_dm_channel, ) )() elif action == RelationshipAction.remove_pending_request: # Friends - props = ContextProperties._from_friends_page() + props = ContextProperties.from_friends() else: - props = ContextProperties._empty() + props = ContextProperties.empty() return self.request(r, context_properties=props) @@ -2591,36 +2581,36 @@ class HTTPClient: if action is RelationshipAction.accept_request: # User Profile, Friends, DM Channel props = choice( ( - ContextProperties._from_friends_page, - ContextProperties._from_user_profile, - ContextProperties._from_dm_channel, + ContextProperties.from_friends, + ContextProperties.from_user_profile, + ContextProperties.from_dm_channel, ) )() elif action is RelationshipAction.block: # Friends, ContextMenu, User Profile, DM Channel. props = choice( ( - ContextProperties._from_contextmenu, - ContextProperties._from_user_profile, - ContextProperties._from_friends_page, - ContextProperties._from_dm_channel, + ContextProperties.from_contextmenu, + ContextProperties.from_user_profile, + ContextProperties.from_friends, + ContextProperties.from_dm_channel, ) )() elif action is RelationshipAction.send_friend_request: # ContextMenu, User Profile, DM Channel props = choice( ( - ContextProperties._from_contextmenu, - ContextProperties._from_user_profile, - ContextProperties._from_dm_channel, + ContextProperties.from_contextmenu, + ContextProperties.from_user_profile, + ContextProperties.from_dm_channel, ) )() else: - props = ContextProperties._empty() + props = ContextProperties.empty() return self.request(r, context_properties=props, json={'type': type} if type else None) def send_friend_request(self, username: str, discriminator: Snowflake) -> Response[None]: r = Route('POST', '/users/@me/relationships') - props = choice((ContextProperties._from_add_friend_page, ContextProperties._from_group_dm))() # Friends, Group DM + props = choice((ContextProperties.from_add_friend, ContextProperties.from_group_dm))() # Friends, Group DM payload = {'username': username, 'discriminator': int(discriminator)} return self.request(r, json=payload, context_properties=props) @@ -3453,7 +3443,7 @@ class HTTPClient: return self.request( Route('GET', '/store/skus/{sku_id}/purchase', sku_id=sku_id), params=params, - context_properties=ContextProperties._empty(), + context_properties=ContextProperties.empty(), ) def purchase_sku( @@ -3496,7 +3486,7 @@ class HTTPClient: return self.request( Route('POST', '/store/skus/{sku_id}/purchase', sku_id=sku_id), json=payload, - context_properties=ContextProperties._empty(), + context_properties=ContextProperties.empty(), ) def create_sku_discount(self, sku_id: Snowflake, user_id: Snowflake, percent_off: int, ttl: int = 600) -> Response[None]: diff --git a/discord/tracking.py b/discord/tracking.py index 1229e0da9..b25fa5d56 100644 --- a/discord/tracking.py +++ b/discord/tracking.py @@ -28,7 +28,7 @@ from base64 import b64encode import json from random import choice -from typing import Dict, Optional, TYPE_CHECKING +from typing import Any, Callable, Dict, Optional, Tuple, TYPE_CHECKING from .utils import MISSING @@ -45,178 +45,111 @@ __all__ = ( # fmt: on -class ContextProperties: # Thank you Discord-S.C.U.M - """Represents the Discord X-Context-Properties header. +class ContextPropertiesMeta(type): + if TYPE_CHECKING: - This header is essential for certain actions (e.g. joining guilds, friend requesting). + def __getattribute__(self, name: str) -> Callable[[], Self]: + ... - .. versionadded:: 1.9 + def __new__(cls, name: str, bases: Tuple[type, ...], attrs: Dict[str, Any]): + cls = super().__new__(cls, name, bases, attrs) + locations = attrs.get('LOCATIONS', {}) + sources = attrs.get('SOURCES', {}) - .. container:: operations + def build_location(location: str) -> classmethod: + def f(cls) -> Self: + data = {'location': location} + return cls(data) - .. describe:: x == y + return classmethod(f) - Checks if two context properties are equal. + def build_source(source: str) -> classmethod: + def f(cls) -> Self: + data = {'source': source} + return cls(data) - .. describe:: x != y + return classmethod(f) - Checks if two context properties are not equal. + for location in locations: + if location: + setattr(cls, f'from_{location.lower().replace(" ", "_").replace("/", "")}', build_location(location)) - .. describe:: hash(x) + for source in sources: + if source: + setattr(cls, f'from_{source.lower().replace(" ", "_")}', build_source(source)) - Return the context property's hash. + return cls - .. describe:: str(x) - Returns the context property's name. +class ContextProperties(metaclass=ContextPropertiesMeta): + """Represents the Discord X-Context-Properties header. - Attributes - ---------- - value: :class:`str` - The encoded header value. + This header is essential for certain actions (e.g. joining guilds, friend requesting). """ - __slots__ = ('_data', 'value') - - def __init__(self, data) -> None: + __slots__ = ('_data',) + + LOCATIONS = { + None: 'e30=', + 'Friends': 'eyJsb2NhdGlvbiI6IkZyaWVuZHMifQ==', + 'ContextMenu': 'eyJsb2NhdGlvbiI6IkNvbnRleHRNZW51In0=', + 'Context Menu': 'eyJsb2NhdGlvbiI6IkNvbnRleHQgTWVudSJ9', + 'User Profile': 'eyJsb2NhdGlvbiI6IlVzZXIgUHJvZmlsZSJ9', + 'Add Friend': 'eyJsb2NhdGlvbiI6IkFkZCBGcmllbmQifQ==', + 'Guild Header': 'eyJsb2NhdGlvbiI6Ikd1aWxkIEhlYWRlciJ9', + 'Group DM': 'eyJsb2NhdGlvbiI6Ikdyb3VwIERNIn0=', + 'DM Channel': 'eyJsb2NhdGlvbiI6IkRNIENoYW5uZWwifQ==', + '/app': 'eyJsb2NhdGlvbiI6ICIvYXBwIn0=', + 'Login': 'eyJsb2NhdGlvbiI6IkxvZ2luIn0=', + 'Register': 'eyJsb2NhdGlvbiI6IlJlZ2lzdGVyIn0=', + 'Verify Email': 'eyJsb2NhdGlvbiI6IlZlcmlmeSBFbWFpbCJ9', + 'New Group DM': 'eyJsb2NhdGlvbiI6Ik5ldyBHcm91cCBETSJ9', + 'Add Friends to DM': 'eyJsb2NhdGlvbiI6IkFkZCBGcmllbmRzIHRvIERNIn0=', + 'Group DM Invite Create': 'eyJsb2NhdGlvbiI6Ikdyb3VwIERNIEludml0ZSBDcmVhdGUifQ==', + 'Stage Channel': 'eyJsb2NhdGlvbiI6IlN0YWdlIENoYW5uZWwifQ==', + } + + SOURCES = { + None: 'e30=', + 'Chat Input Blocker - Lurker Mode': 'eyJzb3VyY2UiOiJDaGF0IElucHV0IEJsb2NrZXIgLSBMdXJrZXIgTW9kZSJ9', + 'Notice - Lurker Mode': 'eyJzb3VyY2UiOiJOb3RpY2UgLSBMdXJrZXIgTW9kZSJ9', + } + + def __init__(self, data: dict) -> None: self._data: Dict[str, Snowflake] = data - self.value: str = self._encode_data(data) - - def _encode_data(self, data) -> str: - library = { - None: 'e30=', - # Locations - 'Friends': 'eyJsb2NhdGlvbiI6IkZyaWVuZHMifQ==', - 'ContextMenu': 'eyJsb2NhdGlvbiI6IkNvbnRleHRNZW51In0=', - 'Context Menu': 'eyJsb2NhdGlvbiI6IkNvbnRleHQgTWVudSJ9', - 'User Profile': 'eyJsb2NhdGlvbiI6IlVzZXIgUHJvZmlsZSJ9', - 'Add Friend': 'eyJsb2NhdGlvbiI6IkFkZCBGcmllbmQifQ==', - 'Guild Header': 'eyJsb2NhdGlvbiI6Ikd1aWxkIEhlYWRlciJ9', - 'Group DM': 'eyJsb2NhdGlvbiI6Ikdyb3VwIERNIn0=', - 'DM Channel': 'eyJsb2NhdGlvbiI6IkRNIENoYW5uZWwifQ==', - '/app': 'eyJsb2NhdGlvbiI6ICIvYXBwIn0=', - 'Login': 'eyJsb2NhdGlvbiI6IkxvZ2luIn0=', - 'Register': 'eyJsb2NhdGlvbiI6IlJlZ2lzdGVyIn0=', - 'Verify Email': 'eyJsb2NhdGlvbiI6IlZlcmlmeSBFbWFpbCJ9', - 'New Group DM': 'eyJsb2NhdGlvbiI6Ik5ldyBHcm91cCBETSJ9', - 'Add Friends to DM': 'eyJsb2NhdGlvbiI6IkFkZCBGcmllbmRzIHRvIERNIn0=', - 'Group DM Invite Create': 'eyJsb2NhdGlvbiI6Ikdyb3VwIERNIEludml0ZSBDcmVhdGUifQ==', - 'Stage Channel': 'eyJsb2NhdGlvbiI6IlN0YWdlIENoYW5uZWwifQ==', - # Sources - 'Chat Input Blocker - Lurker Mode': 'eyJzb3VyY2UiOiJDaGF0IElucHV0IEJsb2NrZXIgLSBMdXJrZXIgTW9kZSJ9', - 'Notice - Lurker Mode': 'eyJzb3VyY2UiOiJOb3RpY2UgLSBMdXJrZXIgTW9kZSJ9', - } + def _encode_data(self) -> str: try: - return library[self.target] + target = self.target + return self.LOCATIONS.get(target, self.SOURCES[target]) except KeyError: - return b64encode(json.dumps(data, separators=(',', ':')).encode()).decode('utf-8') + return b64encode(json.dumps(self._data, separators=(',', ':')).encode()).decode('utf-8') @classmethod - def _empty(cls) -> Self: + def empty(cls) -> Self: return cls({}) @classmethod - def _from_friends_page(cls) -> Self: - data = {'location': 'Friends'} - return cls(data) - - @classmethod - def _from_contextmenu(cls) -> Self: - data = {'location': 'ContextMenu'} - return cls(data) - - @classmethod - def _from_context_menu(cls) -> Self: - data = {'location': 'Context Menu'} - return cls(data) - - @classmethod - def _from_user_profile(cls) -> Self: - data = {'location': 'User Profile'} - return cls(data) - - @classmethod - def _from_add_friend_page(cls) -> Self: - data = {'location': 'Add Friend'} - return cls(data) - - @classmethod - def _from_guild_header(cls) -> Self: - data = {'location': 'Guild Header'} - return cls(data) - - @classmethod - def _from_group_dm(cls) -> Self: - data = {'location': 'Group DM'} - return cls(data) - - @classmethod - def _from_new_group_dm(cls) -> Self: - data = {'location': 'New Group DM'} - return cls(data) - - @classmethod - def _from_dm_channel(cls) -> Self: - data = {'location': 'DM Channel'} - return cls(data) - - @classmethod - def _from_add_to_dm(cls) -> Self: - data = {'location': 'Add Friends to DM'} - return cls(data) - - @classmethod - def _from_group_dm_invite(cls) -> Self: - data = {'location': 'Group DM Invite Create'} - return cls(data) - - @classmethod - def _from_app(cls) -> Self: - data = {'location': '/app'} - return cls(data) - - @classmethod - def _from_login(cls) -> Self: - data = {'location': 'Login'} - return cls(data) - - @classmethod - def _from_register(cls) -> Self: - data = {'location': 'Register'} - return cls(data) - - @classmethod - def _from_verification(cls) -> Self: - data = {'location': 'Verify Email'} - return cls(data) - - @classmethod - def _from_stage_channel(cls) -> Self: - data = {'location': 'Stage Channel'} - return cls(data) - - @classmethod - def _from_accept_invite_page( + def from_accept_invite_page( cls, *, - guild_id: Snowflake = MISSING, - channel_id: Snowflake = MISSING, - channel_type: ChannelType = MISSING, + guild_id: Optional[Snowflake] = None, + channel_id: Optional[Snowflake] = None, + channel_type: Optional[ChannelType] = None, ) -> Self: data: Dict[str, Snowflake] = { 'location': 'Accept Invite Page', } - if guild_id is not MISSING: + if guild_id: data['location_guild_id'] = str(guild_id) - if channel_id is not MISSING: + if channel_id: data['location_channel_id'] = str(channel_id) - if channel_type is not MISSING: + if channel_type: data['location_channel_type'] = int(channel_type) return cls(data) @classmethod - def _from_join_guild_popup( + def from_join_guild( cls, *, guild_id: Snowflake = MISSING, @@ -235,7 +168,7 @@ class ContextProperties: # Thank you Discord-S.C.U.M return cls(data) @classmethod - def _from_invite_embed( + def from_invite_button_embed( cls, *, guild_id: Optional[Snowflake], @@ -253,7 +186,7 @@ class ContextProperties: # Thank you Discord-S.C.U.M return cls(data) @classmethod - def _from_lurking(cls, source: str = MISSING) -> Self: + def from_lurking(cls, source: str = MISSING) -> Self: data = {'source': source or choice(('Chat Input Blocker - Lurker Mode', 'Notice - Lurker Mode'))} return cls(data) @@ -262,26 +195,8 @@ class ContextProperties: # Thank you Discord-S.C.U.M return self._data.get('location', self._data.get('source')) # type: ignore @property - def guild_id(self) -> Optional[int]: - data = self._data.get('location_guild_id') - if data is not None: - return int(data) - - @property - def channel_id(self) -> Optional[int]: - data = self._data.get('location_channel_id') - if data is not None: - return int(data) - - @property - def channel_type(self) -> Optional[int]: - return self._data.get('location_channel_type') # type: ignore - - @property - def message_id(self) -> Optional[int]: - data = self._data.get('location_message_id') - if data is not None: - return int(data) + def value(self) -> str: + return self._encode_data() def __str__(self) -> str: return self.target or 'None' @@ -296,6 +211,3 @@ class ContextProperties: # Thank you Discord-S.C.U.M if isinstance(other, ContextProperties): return self.value != other.value return True - - def __hash__(self) -> int: - return hash(self.value) diff --git a/discord/utils.py b/discord/utils.py index 6ed38c8d1..26c994219 100644 --- a/discord/utils.py +++ b/discord/utils.py @@ -1431,19 +1431,38 @@ class ExpiringString(collections.UserString): self._timer.cancel() -async def _get_info(session: ClientSession) -> Tuple[str, str, int]: +async def _get_info(session: ClientSession) -> Tuple[Dict[str, Any], str]: for _ in range(3): try: - async with session.get('https://cordapi.dolfi.es/api/v1/properties/web', timeout=5) as resp: + async with session.post('https://cordapi.dolfi.es/api/v2/properties/web', timeout=5) as resp: json = await resp.json() - return json['chrome_user_agent'], json['chrome_version'], json['client_build_number'] + return json['properties'], json['encoded'] except Exception: continue + _log.warning('Info API down. Falling back to manual fetching...') ua = await _get_user_agent(session) bn = await _get_build_number(session) bv = _get_browser_version(ua) - return ua, bv, bn + + properties = { + 'os': 'Windows', + 'browser': 'Chrome', + 'device': '', + 'browser_user_agent': ua, + 'browser_version': bv, + 'os_version': '10', + 'referrer': '', + 'referring_domain': '', + 'referrer_current': '', + 'referring_domain_current': '', + 'release_channel': 'stable', + 'system_locale': 'en-US', + 'client_build_number': bn, + 'client_event_source': None, + 'design_id': 0, + } + return properties, b64encode(_to_json(properties).encode()).decode('utf-8') async def _get_build_number(session: ClientSession) -> int: # Thank you Discord-S.C.U.M